Skip to content

Commit eb3ef7d

Browse files
committed
added nosnapshotfound exception
1 parent 4879219 commit eb3ef7d

File tree

2 files changed

+31
-21
lines changed

2 files changed

+31
-21
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ public TableNotFoundException(TableIdentifier tableId) {
7979
}
8080
}
8181

82+
public static class NoSnapshotFoundException extends IOException {
83+
public NoSnapshotFoundException(TableIdentifier tableId) {
84+
super("No Snapshot found: '" + tableId + "'");
85+
}
86+
}
87+
8288
@Getter
8389
private final TableIdentifier tableId;
8490
/** allow the {@link IcebergCatalog} creating this table to qualify its {@link DatasetDescriptor#getName()} used for lineage, etc. */
@@ -97,19 +103,22 @@ public TableNotFoundException(TableIdentifier tableId) {
97103
/** @return metadata info limited to the most recent (current) snapshot */
98104
public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
99105
TableMetadata current = accessTableMetadata();
100-
return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current));
106+
Snapshot currentSnapshot = accessCurrentSnapshot(current);
107+
return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current));
101108
}
102109

103110
/** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */
104111
public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException {
105112
TableMetadata current = accessTableMetadata();
106-
return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current), true);
113+
Snapshot currentSnapshot = accessCurrentSnapshot(current);
114+
return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current), true);
107115
}
108116

109117
/** @return metadata info for all known snapshots, ordered historically, with *most recent last* */
110118
public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOException {
111119
TableMetadata current = accessTableMetadata();
112-
long currentSnapshotId = current.currentSnapshot().snapshotId();
120+
Snapshot currentSnapshot = accessCurrentSnapshot(current);
121+
long currentSnapshotId = currentSnapshot.snapshotId();
113122
List<Snapshot> snapshots = current.snapshots();
114123
return Iterators.transform(snapshots.iterator(), snapshot -> {
115124
try {
@@ -172,6 +181,12 @@ protected TableMetadata accessTableMetadata() throws TableNotFoundException {
172181
return Optional.ofNullable(current).orElseThrow(() -> new TableNotFoundException(this.tableId));
173182
}
174183

184+
/** @throws {@link IcebergTable.NoSnapshotFoundException} when table is empty i.e. table has zero snapshot */
185+
protected Snapshot accessCurrentSnapshot(TableMetadata tableMetadata) throws NoSnapshotFoundException {
186+
Snapshot currentSnapshot = tableMetadata.currentSnapshot();
187+
return Optional.ofNullable(currentSnapshot).orElseThrow(() -> new NoSnapshotFoundException(this.tableId));
188+
}
189+
175190
protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation, Optional<TableMetadata> currentTableMetadata)
176191
throws IOException {
177192
return createSnapshotInfo(snapshot, metadataFileLocation, currentTableMetadata, false);
@@ -239,8 +254,7 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst
239254
public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> icebergPartitionFilterPredicate)
240255
throws IOException {
241256
TableMetadata tableMetadata = accessTableMetadata();
242-
Snapshot currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot())
243-
.orElseThrow(() -> new IOException(String.format("~%s~ No snapshots found", tableId)));
257+
Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata);
244258
long currentSnapshotId = currentSnapshot.snapshotId();
245259
List<DataFile> knownDataFiles = new ArrayList<>();
246260
GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker();
@@ -287,10 +301,10 @@ protected void overwritePartition(List<DataFile> dataFiles, String partitionColN
287301
return;
288302
}
289303
TableMetadata tableMetadata = accessTableMetadata();
290-
Optional<Snapshot> currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot());
291-
if (currentSnapshot.isPresent()) {
292-
log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.get().snapshotId());
293-
} else {
304+
try {
305+
Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata);
306+
log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.snapshotId());
307+
} catch (NoSnapshotFoundException e) {
294308
log.warn("~{}~ No current snapshot found before overwrite", tableId);
295309
}
296310
OverwriteFiles overwriteFiles = this.table.newOverwrite();

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
126126
Assert.fail("expected an exception when using table ID '" + bogusTableId + "'");
127127
}
128128

129+
/** Verify failure when attempting to get current snapshot info for an empty table */
130+
@Test(expectedExceptions = IcebergTable.NoSnapshotFoundException.class)
131+
public void testGetCurrentSnapshotInfoOnEmptyTable() throws IOException {
132+
IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri,
133+
catalog.loadTable(tableId)).getCurrentSnapshotInfo();
134+
Assert.fail("expected an exception when using table ID '" + tableId + "'");
135+
}
136+
129137
/** Verify info about all (full) snapshots */
130138
@Test
131139
public void testGetAllSnapshotInfosIterator() throws IOException {
@@ -253,18 +261,6 @@ public void testGetPartitionSpecificDataFiles() throws IOException {
253261
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0);
254262
}
255263

256-
@Test
257-
public void testGetPartitionSpecificDataFilesThrowsExceptionWhenNoSnapshotsFound() throws IOException {
258-
IcebergTable icebergTable = new IcebergTable(tableId,
259-
catalog.newTableOps(tableId),
260-
catalogUri,
261-
catalog.loadTable(tableId));
262-
// Using AlwaysTrue Predicate to avoid mocking of predicate class
263-
Predicate<StructLike> alwaysTruePredicate = partition -> true;
264-
IOException exception = Assert.expectThrows(IOException.class, () -> icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate));
265-
Assert.assertEquals(exception.getMessage(), String.format("~%s~ No snapshots found", tableId));
266-
}
267-
268264
/** Verify that overwritePartition replace data files belonging to given partition col and value */
269265
@Test
270266
public void testOverwritePartition() throws IOException {

0 commit comments

Comments
 (0)