Skip to content

Commit

Permalink
addressed pr comments and added few extra logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 committed Oct 8, 2024
1 parent 46bd976 commit e1e6f57
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class IcebergOverwritePartitionsStep implements CommitStep {
* Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters.
*
* @param destTableIdStr the identifier of the destination table as a string
* @param serializedDataFiles the serialized data files to be used for replacing partitions
* @param serializedDataFiles [from List<DataFiles>] the serialized data files to be used for replacing partitions
* @param properties the properties containing configuration
*/
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) {
Expand Down Expand Up @@ -106,7 +106,7 @@ public void execute() throws IOException {
);
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer();
overwritePartitionsRetryer.call(() -> {
destTable.overwritePartitions(dataFiles, this.partitionColName, this.partitionValue);
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue);
return null;
});
log.info("Overwriting Data files completed for partition {} with value {} for destination table : {} ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.data.management.copy.iceberg;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -91,6 +92,9 @@ private Predicate<StructLike> createPartitionFilterPredicate() throws IcebergTab
srcTableMetadata,
supportedTransforms
);
Preconditions.checkArgument(partitionColumnIndex >= 0, String.format(
"Partition column %s not found in table %s",
this.partitionColumnName, this.getFileSetId()));
return new IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex, this.partitionColValue);
}

Expand Down Expand Up @@ -152,6 +156,8 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws IcebergTable.TableNotFoundException {
List<DataFile> destDataFiles = new ArrayList<>();
if (srcDataFiles.isEmpty()) {
log.warn("No data files found for partition col : {} with partition value : {} in source table : {}",
this.partitionColumnName, this.partitionColValue, this.getFileSetId());
return destDataFiles;
}
TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata();
Expand Down Expand Up @@ -179,6 +185,7 @@ private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws Iceb
.withPath(updatedDestFilePath.toString())
.build());
// Store the mapping of srcPath to destPath to be used in creating list of src file status to dest path
log.debug("Path changed from Src : {} to Dest : {}", srcFilePath, updatedDestFilePath);
srcPathToDestPath.put(new Path(srcFilePath), updatedDestFilePath);
if (growthMilestoneTracker.isAnotherMilestone(destDataFiles.size())) {
log.info("Generated {} destination data files", destDataFiles.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties)
protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable,
Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException {
// TODO: Add Validator for source and destination tables later
// TableMetadata srcTableMetadata = srcIcebergTable.accessTableMetadata();
// TableMetadata destTableMetadata = destIcebergTable.accessTableMetadata();
// IcebergTableMetadataValidator.validateSourceAndDestinationTablesMetadata(srcTableMetadata, destTableMetadata);

String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE,
ICEBERG_PARTITION_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ public TableNotFoundException(TableIdentifier tableId) {
private final String catalogUri;
private final Table table;

@VisibleForTesting
IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri) {
this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, null);
}

@VisibleForTesting
IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri, Table table) {
this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, table);
Expand Down Expand Up @@ -265,23 +260,26 @@ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> iceber
}

/**
* Overwrite partitions in the table with the specified list of data files.
* Overwrite partition data files in the table for the specified partition col name & partition value.
* <p>
* Overwrite partition replaces the partitions using the expression filter provided.
* Overwrite partition replaces the partition using the expression filter provided.
* </p>
* @param dataFiles the list of data files to replace partitions with
* @param partitionColName the partition column name whose data files are to be replaced
* @param partitionValue the partition column value on which data files will be replaced
*/
protected void overwritePartitions(List<DataFile> dataFiles, String partitionColName, String partitionValue) {
protected void overwritePartition(List<DataFile> dataFiles, String partitionColName, String partitionValue)
throws TableNotFoundException {
if (dataFiles.isEmpty()) {
return;
}
log.info("SnapshotId before overwrite: {}", accessTableMetadata().currentSnapshot().snapshotId());
OverwriteFiles overwriteFiles = this.table.newOverwrite();
overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue));
dataFiles.forEach(overwriteFiles::addFile);
overwriteFiles.commit();
this.tableOps.refresh();
log.info("SnapshotId after overwrite: {}", accessTableMetadata().currentSnapshot().snapshotId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public void testIsCompleted() {
@Test
public void testExecute() {
try {
Mockito.doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), Mockito.anyString(),
Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(),
Mockito.anyString());
mockIcebergOverwritePartitionsStep.execute();
Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartitions(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
} catch (IOException e) {
Assert.fail(String.format("Unexpected IOException : %s", e));
Expand All @@ -100,51 +100,32 @@ public void testExecute() {
public void testExecuteWithRetry() {
try {
// first call throw exception which will be retried and on second call nothing happens
Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(),
Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
mockIcebergOverwritePartitionsStep.execute();
Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartitions(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
} catch (IOException e) {
Assert.fail(String.format("Unexpected IOException : %s", e));
}
}

@Test
public void testExecuteWithDefaultRetry() {
public void testExecuteWithDefaultRetry() throws IcebergTable.TableNotFoundException {
try {
// Always throw exception
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartitions(Mockito.anyList(),
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
mockIcebergOverwritePartitionsStep.execute();
} catch (RuntimeException e) {
Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartitions(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
assertRetryTimes(e, 3);
} catch (IOException e) {
Assert.fail(String.format("Unexpected IOException : %s", e));
}
}

/** Disabling this test to avoid interrupting thread */
@Test(enabled = false)
public void testExecuteWithRetryAndInterrupt() {
// first call throw exception which will be retried and on second call nothing happens
Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
Thread.currentThread().interrupt();
try {
mockIcebergOverwritePartitionsStep.execute();
Assert.fail("Expected Runtime Exception to be thrown");
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().startsWith(
String.format("Failed to overwrite partition for destination table : {%s} : (retried 1 times) ... then interrupted ", destTableIdStr)),
e.getMessage());
} catch (IOException e) {
Assert.fail("Expected Runtime Exception to be thrown");
}
}

@Test
public void testExecuteWithCustomRetryConfig() throws IOException {
int retryCount = 7;
Expand All @@ -156,11 +137,11 @@ public void testExecuteWithCustomRetryConfig() throws IOException {
Mockito.doReturn(mockIcebergCatalog).when(mockIcebergOverwritePartitionsStep).createDestinationCatalog();
try {
// Always throw exception
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartitions(Mockito.anyList(),
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
mockIcebergOverwritePartitionsStep.execute();
} catch (RuntimeException e) {
Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartitions(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartition(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString());
assertRetryTimes(e, retryCount);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
import org.testng.Assert;
Expand Down Expand Up @@ -111,15 +112,17 @@ public void testGetCurrentSnapshotInfo() throws IOException {
);

initializeSnapshots(table, perSnapshotFilesets);
IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo();
IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri,
catalog.loadTable(tableId)).getCurrentSnapshotInfo();
verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size());
}

/** Verify failure when attempting to get current snapshot info for non-existent table */
@Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
@Test(expectedExceptions = {IcebergTable.TableNotFoundException.class, NoSuchTableException.class})
public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS");
IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo();
IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri,
catalog.loadTable(bogusTableId)).getCurrentSnapshotInfo();
Assert.fail("expected an exception when using table ID '" + bogusTableId + "'");
}

Expand All @@ -134,7 +137,8 @@ public void testGetAllSnapshotInfosIterator() throws IOException {
);

initializeSnapshots(table, perSnapshotFilesets);
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getAllSnapshotInfosIterator());
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");

for (int i = 0; i < snapshotInfos.size(); ++i) {
Expand All @@ -154,7 +158,8 @@ public void testGetIncrementalSnapshotInfosIterator() throws IOException {
);

initializeSnapshots(table, perSnapshotFilesets);
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator());
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");

for (int i = 0; i < snapshotInfos.size(); ++i) {
Expand All @@ -174,7 +179,8 @@ public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOExce
);

initializeSnapshots(table, perSnapshotFilesets);
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator());
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");

for (int i = 0; i < snapshotInfos.size(); ++i) {
Expand Down Expand Up @@ -205,7 +211,8 @@ public void testNewTablePropertiesAreRegistered() throws Exception {
TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable");
catalog.createTable(destTableId, icebergSchema, null, destTableProperties);

IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri);
IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri,
catalog.loadTable(destTableId));
// Mock a source table with the same table UUID copying new properties
TableMetadata newSourceTableProperties = destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties);

Expand Down Expand Up @@ -339,7 +346,7 @@ protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc)
return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
}

@Test(enabled = false)
@Test(groups = {"disabledOnCI"})
public void testGetPartitionSpecificDataFiles() throws IOException {
TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec);
Expand Down Expand Up @@ -371,8 +378,8 @@ public void testGetPartitionSpecificDataFiles() throws IOException {
catalog.dropTable(testTableId);
}

@Test(enabled = false)
public void testOverwritePartitions() throws IOException {
@Test(groups = {"disabledOnCI"})
public void testOverwritePartition() throws IOException {
TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable");
Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec);

Expand Down Expand Up @@ -409,7 +416,7 @@ public void testOverwritePartitions() throws IOException {

List<DataFile> dataFiles2 = getDataFiles(paths2, partitionDataList2);
// here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table
icebergTable.overwritePartitions(dataFiles2, "id", "2");
icebergTable.overwritePartition(dataFiles2, "id", "2");
List<String> expectedPaths2 = new ArrayList<>(paths);
expectedPaths2.addAll(paths2);
verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match");
Expand All @@ -421,7 +428,7 @@ public void testOverwritePartitions() throws IOException {
// Reusing same partition dats to create data file with different paths
List<DataFile> dataFiles3 = getDataFiles(paths3, partitionDataList);
// here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path
icebergTable.overwritePartitions(dataFiles3, "id", "1");
icebergTable.overwritePartition(dataFiles3, "id", "1");
List<String> expectedPaths3 = new ArrayList<>(paths2);
expectedPaths3.addAll(paths3);
verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match");
Expand Down

0 comments on commit e1e6f57

Please sign in to comment.