Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -2178,7 +2178,33 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION);

long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis();
executeExpireSnapshots(table, session, expireTimestampMillis);
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties());
List<Location> pathsToDelete = new ArrayList<>();
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
Consumer<String> deleteFunction = path -> {
pathsToDelete.add(Location.of(path));
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
try {
fileSystem.deleteFiles(pathsToDelete);
pathsToDelete.clear();
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
}
}
};

try {
table.expireSnapshots()
.expireOlderThan(expireTimestampMillis)
.deleteWith(deleteFunction)
.commit();

fileSystem.deleteFiles(pathsToDelete);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
}
}

private static void validateTableExecuteParameters(
@@ -3782,15 +3808,6 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
commitUpdateAndTransaction(appendFiles, session, transaction, "refresh materialized view");
transaction = null;
fromSnapshotForRefresh = Optional.empty();

// cleanup old snapshots
try {
executeExpireSnapshots(icebergTable, session, System.currentTimeMillis());
}
catch (Exception e) {
log.error(e, "Failed to delete old snapshot files during materialized view refresh");
}

return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
.map(CommitTaskData::path)
.collect(toImmutableList())));
@@ -4075,37 +4092,6 @@ private void beginTransaction(Table icebergTable)
transaction = catalog.newTransaction(icebergTable);
}

private void executeExpireSnapshots(Table icebergTable, ConnectorSession session, long expireTimestampMillis)
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), icebergTable.io().properties());
List<Location> pathsToDelete = new ArrayList<>();
// deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used
Consumer<String> deleteFunction = path -> {
pathsToDelete.add(Location.of(path));
if (pathsToDelete.size() == DELETE_BATCH_SIZE) {
try {
fileSystem.deleteFiles(pathsToDelete);
pathsToDelete.clear();
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
}
}
};

try {
icebergTable.expireSnapshots()
.expireOlderThan(expireTimestampMillis)
.deleteWith(deleteFunction)
.commit();

fileSystem.deleteFiles(pathsToDelete);
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e);
}
}

private static IcebergTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle)
{
requireNonNull(tableHandle, "tableHandle is null");
Original file line number Diff line number Diff line change
@@ -19,8 +19,6 @@
import io.trino.Session;
import io.trino.connector.MockConnectorFactory;
import io.trino.connector.MockConnectorPlugin;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
@@ -53,7 +51,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
@@ -1111,99 +1108,6 @@ public void testRefreshWithCompaction()
assertUpdate("DROP TABLE %s".formatted(sourceTableName));
}

@Test
void testPreviousSnapshotCleanupDuringRefresh()
throws IOException
{
String sourceTableName = "source_table" + randomNameSuffix();
String materializedViewName = "test_materialized_view" + randomNameSuffix();

// create source table and an MV
assertUpdate("CREATE TABLE " + sourceTableName + " (a int, b varchar)");
assertUpdate("INSERT INTO " + sourceTableName + " VALUES (1, 'abc'), (2, 'def')", 2);
assertUpdate("CREATE MATERIALIZED VIEW " + materializedViewName + " AS SELECT a, b FROM " + sourceTableName + " WHERE a < 3 OR a > 5");
// Until first MV refresh no data files are created hence perform first MV refresh to get data files created for the MV
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2);

TrinoFileSystem fileSystemFactory = getFileSystemFactory(getQueryRunner()).create(ConnectorIdentity.ofUser("test"));

// Identify different types of files containing in an MV
Location metadataLocation = Location.of(getStorageMetadataLocation(materializedViewName));
FileIterator tableFiles = fileSystemFactory.listFiles(Location.of(metadataLocation.toString().substring(0, metadataLocation.toString().indexOf("/metadata"))));
ImmutableSet.Builder<FileEntry> previousDataFiles = ImmutableSet.builder();
ImmutableSet.Builder<FileEntry> previousMetadataFiles = ImmutableSet.builder();
ImmutableSet.Builder<FileEntry> previousManifestsFiles = ImmutableSet.builder();
while (tableFiles.hasNext()) {
FileEntry file = tableFiles.next();
String location = file.location().toString();
if (location.contains("/data/")) {
previousDataFiles.add(file);
}
else if (location.contains("/metadata/") && location.endsWith(".json")) {
previousMetadataFiles.add(file);
}
else if (location.contains("/metadata") && !location.contains("snap-") && location.endsWith(".avro")) {
previousManifestsFiles.add(file);
}
}

// Execute MV refresh after deleting existing records and inserting new records in source table
assertUpdate("DELETE FROM " + sourceTableName + " WHERE a = 1 OR a = 2", 2);
assertQueryReturnsEmptyResult("SELECT * FROM " + sourceTableName);
assertUpdate("INSERT INTO " + sourceTableName + " VALUES (7, 'pqr'), (8, 'xyz')", 2);
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2);
assertThat(query("SELECT * FROM " + materializedViewName))
.matches("VALUES (7, VARCHAR 'pqr'), (8, VARCHAR 'xyz')");

// Identify different types of files containing in an MV after MV refresh
Location latestMetadataLocation = Location.of(getStorageMetadataLocation(materializedViewName));
FileIterator latestTableFiles = fileSystemFactory.listFiles(Location.of(latestMetadataLocation.toString().substring(0, latestMetadataLocation.toString().indexOf("/metadata"))));
ImmutableSet.Builder<FileEntry> currentDataFiles = ImmutableSet.builder();
ImmutableSet.Builder<FileEntry> currentMetadataFiles = ImmutableSet.builder();
ImmutableSet.Builder<FileEntry> currentManifestsFiles = ImmutableSet.builder();
while (latestTableFiles.hasNext()) {
FileEntry file = latestTableFiles.next();
String location = file.location().toString();
if (location.contains("/data/")) {
currentDataFiles.add(file);
}
else if (location.contains("/metadata/") && location.endsWith(".json")) {
currentMetadataFiles.add(file);
}
else if (location.contains("/metadata") && !location.contains("snap-") && location.endsWith(".avro")) {
currentManifestsFiles.add(file);
}
}

// data files from previous snapshot are absent in latest MV snapshot as those are cleaned up after MV refresh
assertThat(previousDataFiles.build())
.isNotEmpty()
.satisfies(dataFilesBeforeMvRefresh ->
assertThat(currentDataFiles.build())
.isNotEmpty()
.doesNotContainAnyElementsOf(dataFilesBeforeMvRefresh));

// metadata files from previous snapshot are still present in latest MV snapshot as those are not cleaned up after MV refresh
assertThat(previousMetadataFiles.build())
.isNotEmpty()
.satisfies(metadataFilesBeforeMvRefresh ->
assertThat(currentMetadataFiles.build())
.isNotEmpty()
.containsAll(metadataFilesBeforeMvRefresh));

// manifests files from previous snapshot are absent in latest MV snapshot as those are cleaned up after MV refresh
assertThat(previousManifestsFiles.build())
.isNotEmpty()
.satisfies(manifestsBeforeMvRefresh ->
assertThat(currentManifestsFiles.build())
.isNotEmpty()
.doesNotContainAnyElementsOf(manifestsBeforeMvRefresh));

// cleanup
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
assertUpdate("DROP TABLE " + sourceTableName);
}

protected String getColumnComment(String tableName, String columnName)
{
return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");