Skip to content

Commit e2157e8

Browse files
mayankvadariyaraunaqmorarka
authored andcommitted
Revert "Cleanup previous snapshot files during materialized view refresh"
This reverts commit 4813578.
1 parent 52e8a47 commit e2157e8

File tree

2 files changed

+27
-137
lines changed

2 files changed

+27
-137
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2178,7 +2178,33 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21782178
IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION);
21792179

21802180
long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
2181-
executeExpireSnapshots(table, session, expireTimestampMillis);
2181+
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties());
2182+
List<Location> pathsToDelete = new ArrayList<>();
2183+
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
2184+
Consumer<String> deleteFunction = path -> {
2185+
pathsToDelete.add(Location.of(path));
2186+
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
2187+
try {
2188+
fileSystem.deleteFiles(pathsToDelete);
2189+
pathsToDelete.clear();
2190+
}
2191+
catch (IOException e) {
2192+
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
2193+
}
2194+
}
2195+
};
2196+
2197+
try {
2198+
table.expireSnapshots()
2199+
.expireOlderThan(expireTimestampMillis)
2200+
.deleteWith(deleteFunction)
2201+
.commit();
2202+
2203+
fileSystem.deleteFiles(pathsToDelete);
2204+
}
2205+
catch (IOException e) {
2206+
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
2207+
}
21822208
}
21832209

21842210
private static void validateTableExecuteParameters(
@@ -3782,15 +3808,6 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
37823808
commitUpdateAndTransaction(appendFiles, session, transaction, "refresh materialized view");
37833809
transaction = null;
37843810
fromSnapshotForRefresh = Optional.empty();
3785-
3786-
// cleanup old snapshots
3787-
try {
3788-
executeExpireSnapshots(icebergTable, session, System.currentTimeMillis());
3789-
}
3790-
catch (Exception e) {
3791-
log.error(e, "Failed to delete old snapshot files during materialized view refresh");
3792-
}
3793-
37943811
return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
37953812
.map(CommitTaskData::path)
37963813
.collect(toImmutableList())));
@@ -4075,37 +4092,6 @@ private void beginTransaction(Table icebergTable)
40754092
transaction = catalog.newTransaction(icebergTable);
40764093
}
40774094

4078-
private void executeExpireSnapshots(Table icebergTable, ConnectorSession session, long expireTimestampMillis)
4079-
{
4080-
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), icebergTable.io().properties());
4081-
List<Location> pathsToDelete = new ArrayList<>();
4082-
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
4083-
Consumer<String> deleteFunction = path -> {
4084-
pathsToDelete.add(Location.of(path));
4085-
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
4086-
try {
4087-
fileSystem.deleteFiles(pathsToDelete);
4088-
pathsToDelete.clear();
4089-
}
4090-
catch (IOException | UncheckedIOException e) {
4091-
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
4092-
}
4093-
}
4094-
};
4095-
4096-
try {
4097-
icebergTable.expireSnapshots()
4098-
.expireOlderThan(expireTimestampMillis)
4099-
.deleteWith(deleteFunction)
4100-
.commit();
4101-
4102-
fileSystem.deleteFiles(pathsToDelete);
4103-
}
4104-
catch (IOException | UncheckedIOException e) {
4105-
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
4106-
}
4107-
}
4108-
41094095
private static IcebergTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle)
41104096
{
41114097
requireNonNull(tableHandle, "tableHandle is null");

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java

Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import io.trino.Session;
2020
import io.trino.connector.MockConnectorFactory;
2121
import io.trino.connector.MockConnectorPlugin;
22-
import io.trino.filesystem.FileEntry;
23-
import io.trino.filesystem.FileIterator;
2422
import io.trino.filesystem.Location;
2523
import io.trino.filesystem.TrinoFileSystem;
2624
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
@@ -53,7 +51,6 @@
5351
import org.junit.jupiter.api.BeforeAll;
5452
import org.junit.jupiter.api.Test;
5553

56-
import java.io.IOException;
5754
import java.time.ZonedDateTime;
5855
import java.util.List;
5956
import java.util.Map;
@@ -1111,99 +1108,6 @@ public void testRefreshWithCompaction()
11111108
assertUpdate("DROP TABLE %s".formatted(sourceTableName));
11121109
}
11131110

1114-
@Test
1115-
void testPreviousSnapshotCleanupDuringRefresh()
1116-
throws IOException
1117-
{
1118-
String sourceTableName = "source_table" + randomNameSuffix();
1119-
String materializedViewName = "test_materialized_view" + randomNameSuffix();
1120-
1121-
// create source table and an MV
1122-
assertUpdate("CREATE TABLE " + sourceTableName + " (a int, b varchar)");
1123-
assertUpdate("INSERT INTO " + sourceTableName + " VALUES (1, 'abc'), (2, 'def')", 2);
1124-
assertUpdate("CREATE MATERIALIZED VIEW " + materializedViewName + " AS SELECT a, b FROM " + sourceTableName + " WHERE a < 3 OR a > 5");
1125-
// Until first MV refresh no data files are created hence perform first MV refresh to get data files created for the MV
1126-
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2);
1127-
1128-
TrinoFileSystem fileSystemFactory = getFileSystemFactory(getQueryRunner()).create(ConnectorIdentity.ofUser("test"));
1129-
1130-
// Identify different types of files containing in an MV
1131-
Location metadataLocation = Location.of(getStorageMetadataLocation(materializedViewName));
1132-
FileIterator tableFiles = fileSystemFactory.listFiles(Location.of(metadataLocation.toString().substring(0, metadataLocation.toString().indexOf("/metadata"))));
1133-
ImmutableSet.Builder<FileEntry> previousDataFiles = ImmutableSet.builder();
1134-
ImmutableSet.Builder<FileEntry> previousMetadataFiles = ImmutableSet.builder();
1135-
ImmutableSet.Builder<FileEntry> previousManifestsFiles = ImmutableSet.builder();
1136-
while (tableFiles.hasNext()) {
1137-
FileEntry file = tableFiles.next();
1138-
String location = file.location().toString();
1139-
if (location.contains("/data/")) {
1140-
previousDataFiles.add(file);
1141-
}
1142-
else if (location.contains("/metadata/") && location.endsWith(".json")) {
1143-
previousMetadataFiles.add(file);
1144-
}
1145-
else if (location.contains("/metadata") && !location.contains("snap-") && location.endsWith(".avro")) {
1146-
previousManifestsFiles.add(file);
1147-
}
1148-
}
1149-
1150-
// Execute MV refresh after deleting existing records and inserting new records in source table
1151-
assertUpdate("DELETE FROM " + sourceTableName + " WHERE a = 1 OR a = 2", 2);
1152-
assertQueryReturnsEmptyResult("SELECT * FROM " + sourceTableName);
1153-
assertUpdate("INSERT INTO " + sourceTableName + " VALUES (7, 'pqr'), (8, 'xyz')", 2);
1154-
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2);
1155-
assertThat(query("SELECT * FROM " + materializedViewName))
1156-
.matches("VALUES (7, VARCHAR 'pqr'), (8, VARCHAR 'xyz')");
1157-
1158-
// Identify different types of files containing in an MV after MV refresh
1159-
Location latestMetadataLocation = Location.of(getStorageMetadataLocation(materializedViewName));
1160-
FileIterator latestTableFiles = fileSystemFactory.listFiles(Location.of(latestMetadataLocation.toString().substring(0, latestMetadataLocation.toString().indexOf("/metadata"))));
1161-
ImmutableSet.Builder<FileEntry> currentDataFiles = ImmutableSet.builder();
1162-
ImmutableSet.Builder<FileEntry> currentMetadataFiles = ImmutableSet.builder();
1163-
ImmutableSet.Builder<FileEntry> currentManifestsFiles = ImmutableSet.builder();
1164-
while (latestTableFiles.hasNext()) {
1165-
FileEntry file = latestTableFiles.next();
1166-
String location = file.location().toString();
1167-
if (location.contains("/data/")) {
1168-
currentDataFiles.add(file);
1169-
}
1170-
else if (location.contains("/metadata/") && location.endsWith(".json")) {
1171-
currentMetadataFiles.add(file);
1172-
}
1173-
else if (location.contains("/metadata") && !location.contains("snap-") && location.endsWith(".avro")) {
1174-
currentManifestsFiles.add(file);
1175-
}
1176-
}
1177-
1178-
// data files from previous snapshot are absent in latest MV snapshot as those are cleaned up after MV refresh
1179-
assertThat(previousDataFiles.build())
1180-
.isNotEmpty()
1181-
.satisfies(dataFilesBeforeMvRefresh ->
1182-
assertThat(currentDataFiles.build())
1183-
.isNotEmpty()
1184-
.doesNotContainAnyElementsOf(dataFilesBeforeMvRefresh));
1185-
1186-
// metadata files from previous snapshot are still present in latest MV snapshot as those are not cleaned up after MV refresh
1187-
assertThat(previousMetadataFiles.build())
1188-
.isNotEmpty()
1189-
.satisfies(metadataFilesBeforeMvRefresh ->
1190-
assertThat(currentMetadataFiles.build())
1191-
.isNotEmpty()
1192-
.containsAll(metadataFilesBeforeMvRefresh));
1193-
1194-
// manifests files from previous snapshot are absent in latest MV snapshot as those are cleaned up after MV refresh
1195-
assertThat(previousManifestsFiles.build())
1196-
.isNotEmpty()
1197-
.satisfies(manifestsBeforeMvRefresh ->
1198-
assertThat(currentManifestsFiles.build())
1199-
.isNotEmpty()
1200-
.doesNotContainAnyElementsOf(manifestsBeforeMvRefresh));
1201-
1202-
// cleanup
1203-
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
1204-
assertUpdate("DROP TABLE " + sourceTableName);
1205-
}
1206-
12071111
protected String getColumnComment(String tableName, String columnName)
12081112
{
12091113
return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");

0 commit comments

Comments
 (0)