diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 4dcc72d6c..18a57f295 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -28,6 +28,7 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_writer.h" +#include "iceberg/snapshot.h" #include "iceberg/statistics_file.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" @@ -350,6 +351,11 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // Force ReachableFileCleanup: the empty current manifest list makes this an + // unreachable-orphan scenario, which Incremental cannot detect (added-in-ancestor + // files are preserved unless an explicit DELETED entry exists). Specifying the + // snapshot id forces the dispatch to Reachable. + update->ExpireSnapshotId(kExpiredSnapshotId); update->DeleteWith( [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); @@ -388,6 +394,8 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) { std::vector deleted_files; ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // See note above; same scenario. + update->ExpireSnapshotId(kExpiredSnapshotId); update->DeleteWith( [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); @@ -573,4 +581,102 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) { EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path))); } +// No explicit snapshot id selects the incremental path, which preserves data +// files added by ancestors and removes only the expired manifest + manifest list. +TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFiles) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + // No ExpireSnapshotId -> incremental dispatch. + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + // Manifest + manifest list deleted, data file preserved. + EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path)); + EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path)); + EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path))); +} + +// Same fixture, but force the Reachable path with ExpireSnapshotId. Reachable +// detects the data file as unreachable from the (empty) current state and +// deletes it -- demonstrating the dispatch actually selects different code paths. +TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path, + expired_data_manifest_path, + expired_manifest_list_path)); +} + +// Commit() must surface a malformed source-snapshot-id on an ancestor as an +// InvalidArgument from the cherry-pick parse path; the older code silently +// dropped the entry, letting bad metadata flow through successfully. +TEST_F(ExpireSnapshotsCleanupTest, CommitPropagatesMalformedSourceSnapshotId) { + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-malformed-ml.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-malformed-ml.avro"; + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, {}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + + auto metadata = ReloadMetadata(); + ASSERT_EQ(metadata->snapshots.size(), 2); + metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path; + metadata->snapshots.at(1)->manifest_list = current_manifest_list_path; + metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] = + "not-a-number"; + RewriteTable(std::move(metadata)); + + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + EXPECT_THAT(update->Commit(), IsError(ErrorKind::kInvalidArgument)); + EXPECT_TRUE(deleted_files.empty()); +} + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index ce65882c9..0e1ad865d 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -31,6 +31,7 @@ #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_reader.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/statistics_file.h" @@ -40,6 +41,7 @@ #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/string_util.h" namespace iceberg { @@ -67,14 +69,29 @@ class FileCleanupStrategy { /// /// \param metadata_before_expiration Table metadata before expiration. /// \param metadata_after_expiration Table metadata after expiration. - /// \param expired_snapshot_ids Snapshot IDs that were expired during this operation. /// \param level Controls which types of files are eligible for deletion. virtual Status CleanFiles(const TableMetadata& metadata_before_expiration, const TableMetadata& metadata_after_expiration, - const std::unordered_set& expired_snapshot_ids, CleanupLevel level) = 0; protected: + /// \brief Snapshot IDs present in `before` but not in `after`. + static std::unordered_set ExpiredSnapshotIds(const TableMetadata& before, + const TableMetadata& after) { + std::unordered_set after_ids; + after_ids.reserve(after.snapshots.size()); + for (const auto& s : after.snapshots) { + if (s) after_ids.insert(s->snapshot_id); + } + std::unordered_set expired; + for (const auto& s : before.snapshots) { + if (s && !after_ids.contains(s->snapshot_id)) { + expired.insert(s->snapshot_id); + } + } + return expired; + } + /// \brief Delete a single file void DeleteFile(const std::string& path) { try { @@ -149,8 +166,10 @@ class ReachableFileCleanup : public FileCleanupStrategy { Status CleanFiles(const TableMetadata& metadata_before_expiration, const TableMetadata& metadata_after_expiration, - const std::unordered_set& expired_snapshot_ids, CleanupLevel level) override { + const auto expired_snapshot_ids = + ExpiredSnapshotIds(metadata_before_expiration, metadata_after_expiration); + std::unordered_set retained_snapshot_ids; for (const auto& snapshot : metadata_after_expiration.snapshots) { if (snapshot) { @@ -331,6 +350,296 @@ class ReachableFileCleanup : public FileCleanupStrategy { } }; +/// \brief Incremental file cleanup strategy for simple linear-ancestry expirations. +/// +/// Only safe when: +/// * No snapshot IDs were explicitly listed for expiration. +/// * No removed snapshots lived outside the current main ancestry. +/// * No retained snapshots live outside the current main ancestry. +/// +/// Each manifest is attributed to its writer snapshot via added_snapshot_id, so +/// two snapshot passes are enough -- one over retained snapshots to learn which +/// manifests are still live, one over expired snapshots to learn which manifests, +/// manifest lists, and data files to drop. Cherry-pick protection via +/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was +/// logically introduced by a snapshot whose changes are still present in the +/// current state under a different id. +/// +/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. +class IncrementalFileCleanup : public FileCleanupStrategy { + public: + using FileCleanupStrategy::FileCleanupStrategy; + + Status CleanFiles(const TableMetadata& metadata_before_expiration, + const TableMetadata& metadata_after_expiration, + CleanupLevel level) override { + const auto expired_snapshot_ids = + ExpiredSnapshotIds(metadata_before_expiration, metadata_after_expiration); + if (expired_snapshot_ids.empty()) { + return {}; + } + + std::unordered_set valid_ids; + valid_ids.reserve(metadata_after_expiration.snapshots.size()); + for (const auto& snapshot : metadata_after_expiration.snapshots) { + if (snapshot) { + valid_ids.insert(snapshot->snapshot_id); + } + } + + auto current_result = metadata_before_expiration.SnapshotById( + metadata_before_expiration.current_snapshot_id); + if (!current_result.has_value() || current_result.value() == nullptr) { + return {}; + } + + // Ancestors of the current table state. Files deleted in a non-ancestor + // snapshot may still belong to the current state (rolled-back commits), + // so we only physically delete files removed by ancestor snapshots. + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, [&metadata_before_expiration](int64_t id) { + return metadata_before_expiration.SnapshotById(id); + }); + if (!ancestors_result.has_value()) { + return {}; + } + std::unordered_set ancestor_ids; + ancestor_ids.reserve(ancestors_result.value().size()); + for (const auto& ancestor : ancestors_result.value()) { + if (ancestor) ancestor_ids.insert(ancestor->snapshot_id); + } + + // Cherry-pick protection: snapshots whose changes were picked into the + // current ancestry under a different snapshot id should not be cleaned up. + // Iterate the ancestor pointers we already have rather than re-looking-up + // each snapshot by id. + std::unordered_set picked_ancestor_snapshot_ids; + for (const auto& ancestor : ancestors_result.value()) { + if (!ancestor) continue; + const auto& summary = ancestor->summary; + auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (it == summary.end()) continue; + ICEBERG_ASSIGN_OR_RAISE(auto source_id, + StringUtils::ParseNumber(it->second)); + picked_ancestor_snapshot_ids.insert(source_id); + } + + // Find manifests still referenced by a valid snapshot but written by an + // expired snapshot. Their deleted entries point at data files now safe to + // remove and become candidates for manifests_to_scan below. + std::unordered_set valid_manifests; + std::vector manifests_to_scan; + for (const auto& snapshot : metadata_after_expiration.snapshots) { + if (!snapshot) continue; + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) continue; // best-effort + for (const auto& manifest : manifests_result.value()) { + valid_manifests.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool from_valid_snapshots = valid_ids.contains(writer_id); + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_picked = picked_ancestor_snapshot_ids.contains(writer_id); + if (!from_valid_snapshots && (is_from_ancestor || is_picked) && + manifest.has_deleted_files()) { + manifests_to_scan.push_back(manifest); + } + } + } + + // Find manifests that were only referenced by snapshots that have expired, + // and split them by what kind of cleanup they need: + // - manifests_to_delete: not referenced by any retained snapshot; + // - manifests_to_scan: from a current-state ancestor and has deleted + // entries (data files now safe to drop); + // - manifests_to_revert: written by an expiring non-ancestor snapshot + // and contains added entries -- those data files were never adopted. + std::unordered_set manifest_lists_to_delete; + std::unordered_set manifests_to_delete; + std::vector manifests_to_revert; + for (const auto& snapshot : metadata_before_expiration.snapshots) { + if (!snapshot) continue; + int64_t snapshot_id = snapshot->snapshot_id; + if (valid_ids.contains(snapshot_id)) continue; + + // Skip cherry-picked snapshots; the picked snapshot owns its cleanup. + if (picked_ancestor_snapshot_ids.contains(snapshot_id)) { + continue; + } + + int64_t source_snapshot_id = -1; + auto src_it = snapshot->summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (src_it != snapshot->summary.end()) { + ICEBERG_ASSIGN_OR_RAISE(source_snapshot_id, + StringUtils::ParseNumber(src_it->second)); + } + // If this commit was cherry-picked from a still-live snapshot, skip -- + // removing its data files would revert additions still in the table. + if (ancestor_ids.contains(source_snapshot_id) || + picked_ancestor_snapshot_ids.contains(source_snapshot_id)) { + continue; + } + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (manifests_result.has_value()) { + for (const auto& manifest : manifests_result.value()) { + if (valid_manifests.contains(manifest.manifest_path)) continue; + manifests_to_delete.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_from_expiring_snapshot = expired_snapshot_ids.contains(writer_id); + + if (is_from_ancestor && manifest.has_deleted_files()) { + manifests_to_scan.push_back(manifest); + } + if (!is_from_ancestor && is_from_expiring_snapshot && + manifest.has_added_files()) { + // Files added in this manifest never made it into the current + // ancestry. The is_from_expiring_snapshot guard ensures full + // ancestry between when the manifest was written and now is + // known -- otherwise missing history could hide that the + // snapshot is in fact an ancestor. + manifests_to_revert.push_back(manifest); + } + } + } + + if (!snapshot->manifest_list.empty()) { + manifest_lists_to_delete.insert(snapshot->manifest_list); + } + } + + // Deleting data files + if (level == CleanupLevel::kAll) { + // Manifests may reference partition specs that were pruned during expiration + // when CleanExpiredMetadata is enabled, so resolve schemas/specs against the + // pre-expiration metadata. + auto files_to_delete = FindFilesToDelete( + metadata_before_expiration, manifests_to_scan, manifests_to_revert, valid_ids); + DeleteFiles(files_to_delete); + } + + // Deleting manifest files + DeleteFiles(manifests_to_delete); + + // Deleting manifest-list files + DeleteFiles(manifest_lists_to_delete); + + // Deleting statistics files + if (HasAnyStatisticsFiles(metadata_before_expiration) || + HasAnyStatisticsFiles(metadata_after_expiration)) { + DeleteFiles( + StatisticsFilesToDelete(metadata_before_expiration, metadata_after_expiration)); + } + + return {}; + } + + private: + /// \brief Resolve the data files that the incremental pass identified for deletion. + /// + /// For manifests_to_scan: read DELETED entries whose snapshot id is no longer + /// valid -- those are files an expired-but-ancestral snapshot removed. + /// For manifests_to_revert: read every ADDED entry -- those are files a + /// non-ancestral expired snapshot introduced and the current state never adopted. + std::unordered_set FindFilesToDelete( + const TableMetadata& metadata, const std::vector& manifests_to_scan, + const std::vector& manifests_to_revert, + const std::unordered_set& valid_ids) { + std::unordered_set files_to_delete; + + for (const auto& manifest : manifests_to_scan) { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kDeleted && entry.snapshot_id.has_value() && + !valid_ids.contains(entry.snapshot_id.value()) && entry.data_file) { + files_to_delete.insert(entry.data_file->file_path); + } + } + } + + for (const auto& manifest : manifests_to_revert) { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) continue; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kAdded && entry.data_file) { + files_to_delete.insert(entry.data_file->file_path); + } + } + } + + return files_to_delete; + } +}; + +/// \brief True if any retained snapshot sits outside the current main ancestry. +bool HasNonMainSnapshots(const TableMetadata& metadata) { + auto current_result = metadata.SnapshotById(metadata.current_snapshot_id); + if (!current_result.has_value() || current_result.value() == nullptr) { + return !metadata.snapshots.empty(); + } + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, + [&metadata](int64_t id) { return metadata.SnapshotById(id); }); + if (!ancestors_result.has_value()) { + return true; + } + std::unordered_set main_ancestors; + for (const auto& a : ancestors_result.value()) { + if (a) main_ancestors.insert(a->snapshot_id); + } + for (const auto& snapshot : metadata.snapshots) { + if (snapshot && !main_ancestors.contains(snapshot->snapshot_id)) { + return true; + } + } + return false; +} + +/// \brief True if any expired snapshot lived outside the current main ancestry. +/// +/// When `before` has no current snapshot, the main-ancestor set is empty; any +/// removed snapshot then counts as "non-main" and returns true. This guards the +/// dispatch in Finalize() against picking incremental cleanup when the before-state +/// has snapshots but no current pointer. +bool HasRemovedNonMainAncestors(const TableMetadata& before, const TableMetadata& after) { + std::unordered_set main_ancestors; + auto current_result = before.SnapshotById(before.current_snapshot_id); + if (current_result.has_value() && current_result.value() != nullptr) { + auto ancestors_result = SnapshotUtil::AncestorsOf( + current_result.value()->snapshot_id, + [&before](int64_t id) { return before.SnapshotById(id); }); + if (!ancestors_result.has_value()) { + return true; + } + for (const auto& a : ancestors_result.value()) { + if (a) main_ancestors.insert(a->snapshot_id); + } + } + std::unordered_set after_ids; + after_ids.reserve(after.snapshots.size()); + for (const auto& s : after.snapshots) { + if (s) after_ids.insert(s->snapshot_id); + } + for (const auto& snapshot : before.snapshots) { + if (!snapshot) continue; + bool removed = !after_ids.contains(snapshot->snapshot_id); + bool in_main = main_ancestors.contains(snapshot->snapshot_id); + if (removed && !in_main) { + return true; + } + } + return false; +} + } // namespace Result> ExpireSnapshots::Make( @@ -608,16 +917,24 @@ Status ExpireSnapshots::Finalize(Result commit_result) { auto metadata_before_expiration_ptr = apply_result_->metadata_before_expiration; const TableMetadata& metadata_before_expiration = *metadata_before_expiration_ptr; const TableMetadata& metadata_after_expiration = *commit_result.value(); - std::unordered_set expired_ids(apply_result_->snapshot_ids_to_remove.begin(), - apply_result_->snapshot_ids_to_remove.end()); apply_result_.reset(); - // File cleanup is best-effort: log and continue on individual file deletion failures - ReachableFileCleanup strategy(ctx_->table->io(), delete_func_); - return strategy.CleanFiles(metadata_before_expiration, metadata_after_expiration, - expired_ids, cleanup_level_); + // Pick incremental cleanup when the expiration is a simple linear-ancestry walk: + // no explicit snapshot IDs, no removed snapshots outside main ancestry, and no + // retained snapshots outside main ancestry. + const bool can_use_incremental = + !specified_snapshot_id_ && + !HasRemovedNonMainAncestors(metadata_before_expiration, + metadata_after_expiration) && + !HasNonMainSnapshots(metadata_after_expiration); + + if (can_use_incremental) { + return IncrementalFileCleanup(ctx_->table->io(), delete_func_) + .CleanFiles(metadata_before_expiration, metadata_after_expiration, + cleanup_level_); + } + return ReachableFileCleanup(ctx_->table->io(), delete_func_) + .CleanFiles(metadata_before_expiration, metadata_after_expiration, cleanup_level_); } -// TODO(shangxinli): add IncrementalFileCleanup strategy for linear ancestry optimization. - } // namespace iceberg diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc index 4b3000652..e02ff2823 100644 --- a/src/iceberg/update/pending_update.cc +++ b/src/iceberg/update/pending_update.cc @@ -47,8 +47,7 @@ Status PendingUpdate::Commit() { return std::unexpected(commit_result.error()); } - std::ignore = Finalize(commit_result.value()->metadata().get()); - return {}; + return Finalize(commit_result.value()->metadata().get()); } auto txn = ctx_->transaction->lock(); if (!txn) {