From 487921939f5141e286063daef144fdff1a466a03 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 4 Nov 2024 10:10:06 +0530 Subject: [PATCH 1/2] throw exception incase no snapshot found for copying --- .../data/management/copy/iceberg/IcebergTable.java | 3 ++- .../management/copy/iceberg/IcebergTableTest.java | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index e3ec46aa1e9..fed375374a9 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -239,7 +239,8 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst public List getPartitionSpecificDataFiles(Predicate icebergPartitionFilterPredicate) throws IOException { TableMetadata tableMetadata = accessTableMetadata(); - Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + Snapshot currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot()) + .orElseThrow(() -> new IOException(String.format("~%s~ No snapshots found", tableId))); long currentSnapshotId = currentSnapshot.snapshotId(); List knownDataFiles = new ArrayList<>(); GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 63aa27221b7..69a9688fb13 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -253,6 +253,18 @@ public void testGetPartitionSpecificDataFiles() throws IOException { Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); } + @Test + public void testGetPartitionSpecificDataFilesThrowsExceptionWhenNoSnapshotsFound() throws IOException { + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + // Using AlwaysTrue Predicate to avoid mocking of predicate class + Predicate alwaysTruePredicate = partition -> true; + IOException exception = Assert.expectThrows(IOException.class, () -> icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate)); + Assert.assertEquals(exception.getMessage(), String.format("~%s~ No snapshots found", tableId)); + } + /** Verify that overwritePartition replace data files belonging to given partition col and value */ @Test public void testOverwritePartition() throws IOException { From eb3ef7d0a2a11eb069bf7a285b23b73789a64565 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Sat, 7 Dec 2024 14:43:09 +0530 Subject: [PATCH 2/2] added nosnapshotfound exception --- .../management/copy/iceberg/IcebergTable.java | 32 +++++++++++++------ .../copy/iceberg/IcebergTableTest.java | 20 +++++------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index fed375374a9..9cb1f776f28 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -79,6 +79,12 @@ public TableNotFoundException(TableIdentifier tableId) { } } + public static class NoSnapshotFoundException extends IOException { + public NoSnapshotFoundException(TableIdentifier tableId) { + super("No Snapshot found: '" + tableId + "'"); + } + } + @Getter private final TableIdentifier tableId; /** allow the {@link IcebergCatalog} creating this table to qualify its {@link DatasetDescriptor#getName()} used for lineage, etc. */ @@ -97,19 +103,22 @@ public TableNotFoundException(TableIdentifier tableId) { /** @return metadata info limited to the most recent (current) snapshot */ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException { TableMetadata current = accessTableMetadata(); - return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current)); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current)); } /** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */ public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException { TableMetadata current = accessTableMetadata(); - return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current), true); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current), true); } /** @return metadata info for all known snapshots, ordered historically, with *most recent last* */ public Iterator getAllSnapshotInfosIterator() throws IOException { TableMetadata current = accessTableMetadata(); - long currentSnapshotId = current.currentSnapshot().snapshotId(); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + long currentSnapshotId = currentSnapshot.snapshotId(); List snapshots = current.snapshots(); return Iterators.transform(snapshots.iterator(), snapshot -> { try { @@ -172,6 +181,12 @@ protected TableMetadata accessTableMetadata() throws TableNotFoundException { return Optional.ofNullable(current).orElseThrow(() -> new TableNotFoundException(this.tableId)); } + /** @throws {@link IcebergTable.NoSnapshotFoundException} when table is empty i.e. table has zero snapshot */ + protected Snapshot accessCurrentSnapshot(TableMetadata tableMetadata) throws NoSnapshotFoundException { + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + return Optional.ofNullable(currentSnapshot).orElseThrow(() -> new NoSnapshotFoundException(this.tableId)); + } + protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional metadataFileLocation, Optional currentTableMetadata) throws IOException { return createSnapshotInfo(snapshot, metadataFileLocation, currentTableMetadata, false); @@ -239,8 +254,7 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst public List getPartitionSpecificDataFiles(Predicate icebergPartitionFilterPredicate) throws IOException { TableMetadata tableMetadata = accessTableMetadata(); - Snapshot currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot()) - .orElseThrow(() -> new IOException(String.format("~%s~ No snapshots found", tableId))); + Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata); long currentSnapshotId = currentSnapshot.snapshotId(); List knownDataFiles = new ArrayList<>(); GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); @@ -287,10 +301,10 @@ protected void overwritePartition(List dataFiles, String partitionColN return; } TableMetadata tableMetadata = accessTableMetadata(); - Optional currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot()); - if (currentSnapshot.isPresent()) { - log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.get().snapshotId()); - } else { + try { + Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata); + log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.snapshotId()); + } catch (NoSnapshotFoundException e) { log.warn("~{}~ No current snapshot found before overwrite", tableId); } OverwriteFiles overwriteFiles = this.table.newOverwrite(); diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 69a9688fb13..69983cdacee 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -126,6 +126,14 @@ public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { Assert.fail("expected an exception when using table ID '" + bogusTableId + "'"); } + /** Verify failure when attempting to get current snapshot info for an empty table */ + @Test(expectedExceptions = IcebergTable.NoSnapshotFoundException.class) + public void testGetCurrentSnapshotInfoOnEmptyTable() throws IOException { + IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)).getCurrentSnapshotInfo(); + Assert.fail("expected an exception when using table ID '" + tableId + "'"); + } + /** Verify info about all (full) snapshots */ @Test public void testGetAllSnapshotInfosIterator() throws IOException { @@ -253,18 +261,6 @@ public void testGetPartitionSpecificDataFiles() throws IOException { Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); } - @Test - public void testGetPartitionSpecificDataFilesThrowsExceptionWhenNoSnapshotsFound() throws IOException { - IcebergTable icebergTable = new IcebergTable(tableId, - catalog.newTableOps(tableId), - catalogUri, - catalog.loadTable(tableId)); - // Using AlwaysTrue Predicate to avoid mocking of predicate class - Predicate alwaysTruePredicate = partition -> true; - IOException exception = Assert.expectThrows(IOException.class, () -> icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate)); - Assert.assertEquals(exception.getMessage(), String.format("~%s~ No snapshots found", tableId)); - } - /** Verify that overwritePartition replace data files belonging to given partition col and value */ @Test public void testOverwritePartition() throws IOException {