From 141eadc2dd09ca937f1d9d607fe4302f4f32ffc6 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 13 Jan 2026 22:42:06 +0800 Subject: [PATCH 1/8] feat: add FastAppend --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table.cc | 7 + src/iceberg/table.h | 3 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/fast_append_test.cc | 197 +++++++++++++++++++++ src/iceberg/transaction.cc | 9 +- src/iceberg/transaction.h | 3 + src/iceberg/type_fwd.h | 3 +- src/iceberg/update/append_files.h | 70 ++++++++ src/iceberg/update/fast_append.cc | 248 +++++++++++++++++++++++++++ src/iceberg/update/fast_append.h | 150 ++++++++++++++++ src/iceberg/update/meson.build | 1 + src/iceberg/update/snapshot_update.h | 7 +- 14 files changed, 698 insertions(+), 3 deletions(-) create mode 100644 src/iceberg/test/fast_append_test.cc create mode 100644 src/iceberg/update/append_files.h create mode 100644 src/iceberg/update/fast_append.cc create mode 100644 src/iceberg/update/fast_append.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 35c312f60..e44d9cd76 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -85,6 +85,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/expire_snapshots.cc + update/fast_append.cc update/pending_update.cc update/snapshot_update.cc update/update_location.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 317b4fa9e..d35c15a7f 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -103,6 +103,7 @@ iceberg_sources = files( 'transform_function.cc', 'type.cc', 'update/expire_snapshots.cc', + 'update/fast_append.cc', 'update/pending_update.cc', 'update/snapshot_update.cc', 'update/update_location.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 5c406debc..28ee285ab 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -199,6 +199,13 @@ Result> Table::NewUpdateLocation() { return transaction->NewUpdateLocation(); } +Result> Table::NewFastAppend() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewFastAppend(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index fd346e15a..75cad6e1e 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewUpdateLocation(); + /// \brief Create a new FastAppend to append data files and commit the changes. + virtual Result> NewFastAppend(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index d243a48bf..3414a862e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES expire_snapshots_test.cc + fast_append_test.cc transaction_test.cc update_location_test.cc update_partition_spec_test.cc diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc new file mode 100644 index 000000000..729ac97ee --- /dev/null +++ b/src/iceberg/test/fast_append_test.cc @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/fast_append.h" + +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/test_resource.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +class FastAppendTest : public UpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + UpdateTestBase::SetUp(); + + ASSERT_THAT(catalog_->DropTable(table_ident_, /*purge=*/false), IsOk()); + + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location_, Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL( + auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json")); + metadata->location = table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + ICEBERG_UNWRAP_OR_FAIL(table_, + catalog_->RegisterTable(table_ident_, metadata_location)); + + // Get partition spec and schema from the base table + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + // Create test data files + file_a_ = CreateDataFile("/data/file_a.parquet", 100, 1024); + file_b_ = CreateDataFile("/data/file_b.parquet", 200, 2048); + } + + std::shared_ptr CreateDataFile(const std::string& path, int64_t record_count, + int64_t size, int64_t partition_value = 0) { + auto data_file = std::make_shared(); + data_file->content = DataFile::Content::kData; + data_file->file_path = table_location_ + path; + data_file->file_format = FileFormatType::kParquet; + // The base table has partition spec with identity(x), so we need 1 partition value + data_file->partition = + PartitionValues(std::vector{Literal::Long(partition_value)}); + data_file->file_size_in_bytes = size; + data_file->record_count = record_count; + data_file->partition_spec_id = spec_->spec_id(); + return data_file; + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(FastAppendTest, AppendDataFile) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("added-data-files"), "1"); + EXPECT_EQ(snapshot->summary.at("added-records"), "100"); + EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024"); +} + +TEST_F(FastAppendTest, AppendMultipleDataFiles) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + fast_append->AppendFile(file_b_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("added-data-files"), "2"); + EXPECT_EQ(snapshot->summary.at("added-records"), "300"); + EXPECT_EQ(snapshot->summary.at("added-files-size"), "3072"); +} + +TEST_F(FastAppendTest, AppendManyFiles) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + + int64_t total_records = 0; + int64_t total_size = 0; + constexpr int kFileCount = 10; + for (int index = 0; index < kFileCount; ++index) { + auto data_file = CreateDataFile(std::format("/data/file_{}.parquet", index), + /*record_count=*/10 + index, + /*size=*/100 + index * 10, + /*partition_value=*/index % 2); + total_records += data_file->record_count; + total_size += data_file->file_size_in_bytes; + fast_append->AppendFile(std::move(data_file)); + } + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("added-data-files"), std::to_string(kFileCount)); + EXPECT_EQ(snapshot->summary.at("added-records"), std::to_string(total_records)); + EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size)); +} + +TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) { + EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot")); + const int64_t base_sequence_number = table_->metadata()->last_sequence_number; + + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->sequence_number, base_sequence_number + 1); + EXPECT_EQ(table_->metadata()->last_sequence_number, base_sequence_number + 1); +} + +TEST_F(FastAppendTest, AppendNullFile) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(nullptr); + + auto result = fast_append->Commit(); + EXPECT_FALSE(result.has_value()); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); + EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot")); +} + +TEST_F(FastAppendTest, AppendDuplicateFile) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + fast_append->AppendFile(file_a_); // Add same file twice + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + // Should only count the file once + EXPECT_EQ(snapshot->summary.at("added-data-files"), "1"); + EXPECT_EQ(snapshot->summary.at("added-records"), "100"); +} + +TEST_F(FastAppendTest, SetSnapshotProperty) { + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->Set("custom-property", "custom-value"); + fast_append->AppendFile(file_a_); + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value"); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 10a87e653..d10586a4c 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -23,7 +23,6 @@ #include #include "iceberg/catalog.h" -#include "iceberg/schema.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -31,6 +30,7 @@ #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/expire_snapshots.h" +#include "iceberg/update/fast_append.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_location.h" @@ -293,4 +293,11 @@ Result> Transaction::NewUpdateLocation() { return update_location; } +Result> Transaction::NewFastAppend() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr fast_append, + FastAppend::Make(table_->name().name, shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append)); + return fast_append; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 7133a3b5d..0f567312a 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -86,6 +86,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateLocation(); + /// \brief Create a new FastAppend to append data files and commit the changes. + Result> NewFastAppend(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 251334c14..7706dfd7a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -187,7 +187,9 @@ class TableUpdateContext; class Transaction; /// \brief Update family. +class AppendFiles; class ExpireSnapshots; +class FastAppend; class PendingUpdate; class SnapshotUpdate; class UpdateLocation; @@ -200,7 +202,6 @@ class UpdateSortOrder; /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- -class AppendFiles; class EncryptedKey; } // namespace iceberg diff --git a/src/iceberg/update/append_files.h b/src/iceberg/update/append_files.h new file mode 100644 index 000000000..2c255a6b9 --- /dev/null +++ b/src/iceberg/update/append_files.h @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief API for appending new files in a table. +/// +/// This API accumulates file additions, produces a new Snapshot of the table, and commits +/// that snapshot as the current. +/// +/// When committing, these changes will be applied to the latest table snapshot. Commit +/// conflicts will be resolved by applying the changes to the new latest snapshot and +/// reattempting the commit. +class ICEBERG_EXPORT AppendFiles { + public: + virtual ~AppendFiles() = default; + + /// \brief Append a DataFile to the table. + /// + /// \param file A data file + /// \return Reference to this for method chaining + virtual AppendFiles& AppendFile(std::shared_ptr file) = 0; + + /// \brief Append a ManifestFile to the table. + /// + /// The manifest must contain only appended files. All files in the manifest will be + /// appended to the table in the snapshot created by this update. + /// + /// The manifest will be used directly if snapshot ID inheritance is enabled (all tables + /// with the format version > 1 or if the inheritance is enabled explicitly via table + /// properties). Otherwise, the manifest will be rewritten to assign all entries this + /// update's snapshot ID. + /// + /// If the manifest is rewritten, it is always the responsibility of the caller to + /// manage the lifecycle of the original manifest. If the manifest is used directly, it + /// should never be deleted manually if the commit succeeds as it will become part of + /// the table metadata and will be cleaned upon expiry. If the manifest gets merged with + /// others while preparing a new snapshot, it will be deleted automatically if this + /// operation is successful. If the commit fails, the manifest will never be deleted, + /// and it is up to the caller whether to delete or reuse it. + /// + /// \param file A manifest file + /// \return Reference to this for method chaining + virtual AppendFiles& AppendManifest(const ManifestFile& file) = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc new file mode 100644 index 000000000..575d9c78a --- /dev/null +++ b/src/iceberg/update/fast_append.cc @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/fast_append.h" + +#include +#include +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> FastAppend::Make( + std::string table_name, std::shared_ptr transaction) { + return std::unique_ptr( + new FastAppend(std::move(table_name), std::move(transaction))); +} + +FastAppend::FastAppend(std::string table_name, std::shared_ptr transaction) + : SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {} + +FastAppend& FastAppend::AppendFile(std::shared_ptr file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(), + "Data file must have partition spec ID"); + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id)); + + auto& data_files = new_data_files_by_spec_[spec_id]; + auto [iter, inserted] = data_files.insert(file); + if (inserted) { + has_new_files_ = true; + ICEBERG_BUILDER_RETURN_IF_ERROR(summary_.AddedFile(*spec, *file)); + } + + return *this; +} + +FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) { + ICEBERG_BUILDER_CHECK(!manifest.has_existing_files(), + "Cannot append manifest with existing files"); + ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(), + "Cannot append manifest with deleted files"); + ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId, + "Snapshot id must be assigned during commit"); + ICEBERG_BUILDER_CHECK(manifest.sequence_number == TableMetadata::kInvalidSequenceNumber, + "Sequence number must be assigned during commit"); + + if (can_inherit_snapshot_id() && (manifest.added_snapshot_id == kInvalidSnapshotId)) { + summary_.AddedManifest(manifest); + append_manifests_.push_back(manifest); + } else { + // The manifest must be rewritten with this update's snapshot ID + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest)); + rewritten_append_manifests_.push_back(copied_manifest); + } + + return *this; +} + +FastAppend& FastAppend::ToBranch(const std::string& branch) { + ICEBERG_BUILDER_RETURN_IF_ERROR(SetTargetBranch(branch)); + return *this; +} + +FastAppend& FastAppend::Set(const std::string& property, const std::string& value) { + summary_.Set(property, value); + return *this; +} + +std::string FastAppend::operation() { return DataOperation::kAppend; } + +Result> FastAppend::Apply( + const TableMetadata& metadata_to_update, const std::shared_ptr& snapshot) { + std::vector manifests; + + ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests()); + if (!new_written_manifests.empty()) { + manifests.insert(manifests.end(), new_written_manifests.begin(), + new_written_manifests.end()); + } + + // Transform append manifests and rewritten append manifests with snapshot ID + int64_t snapshot_id = SnapshotId(); + for (const auto& manifest : append_manifests_) { + ManifestFile updated = manifest; + updated.added_snapshot_id = snapshot_id; + manifests.push_back(updated); + } + + for (const auto& manifest : rewritten_append_manifests_) { + ManifestFile updated = manifest; + updated.added_snapshot_id = snapshot_id; + manifests.push_back(updated); + } + + // Add all manifests from the snapshot + if (snapshot != nullptr) { + // Use SnapshotCache to get manifests, similar to snapshot_update.cc + auto cached_snapshot = SnapshotCache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests_span, + cached_snapshot.Manifests(transaction_->table()->io())); + std::vector snapshot_manifests(snapshot_manifests_span.begin(), + snapshot_manifests_span.end()); + manifests.insert(manifests.end(), snapshot_manifests.begin(), + snapshot_manifests.end()); + } + + return manifests; +} + +std::unordered_map FastAppend::Summary() { + summary_.SetPartitionSummaryLimit( + base().properties.Get(TableProperties::kWritePartitionSummaryLimit)); + return summary_.Build(); +} + +void FastAppend::CleanUncommitted(const std::unordered_set& committed) { + // Clean up new manifests that were written but not committed + if (!new_manifests_.empty()) { + for (const auto& manifest : new_manifests_) { + if (committed.find(manifest.manifest_path) == committed.end()) { + std::ignore = DeleteFile(manifest.manifest_path); + } + } + new_manifests_.clear(); + } + + // Clean up only rewritten_append_manifests as they are always owned by the table + // Don't clean up append_manifests as they are added to the manifest list and are + // not compacted + if (!rewritten_append_manifests_.empty()) { + for (const auto& manifest : rewritten_append_manifests_) { + if (committed.find(manifest.manifest_path) == committed.end()) { + std::ignore = DeleteFile(manifest.manifest_path); + } + } + } +} + +bool FastAppend::CleanupAfterCommit() const { + // Cleanup after committing is disabled for FastAppend unless there are + // rewritten_append_manifests because: + // 1.) Appended manifests are never rewritten + // 2.) Manifests which are written out as part of appendFile are already cleaned + // up between commit attempts in writeNewManifests + return !rewritten_append_manifests_.empty(); +} + +Result> FastAppend::Spec(int32_t spec_id) { + return base().PartitionSpecById(spec_id); +} + +Result FastAppend::CopyManifest(const ManifestFile& manifest) { + const TableMetadata& current = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + current.PartitionSpecById(manifest.partition_spec_id)); + + // Read the manifest entries + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ManifestReader::Make(manifest, transaction_->table()->io(), schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + // Create a new manifest writer + // Generate a unique manifest path using the transaction's metadata location + std::string filename = std::format("copy-m{}.avro", copy_manifest_count_++); + std::string new_manifest_path = transaction_->MetadataFileLocation(filename); + int64_t snapshot_id = SnapshotId(); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter( + current.format_version, snapshot_id, new_manifest_path, + transaction_->table()->io(), spec, schema, ManifestContent::kData, + /*first_row_id=*/current.next_row_id)); + + // Write all entries as added entries with the new snapshot ID + for (auto& entry : entries) { + ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded, + "Manifest to copy must only contain added entries"); + entry.snapshot_id = snapshot_id; + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } + + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile()); + + summary_.AddedManifest(new_manifest); + + return new_manifest; +} + +Result> FastAppend::WriteNewManifests() { + // If there are new files and manifests were already written, clean them up + if (has_new_files_ && !new_manifests_.empty()) { + for (const auto& manifest : new_manifests_) { + ICEBERG_RETURN_UNEXPECTED(DeleteFile(manifest.manifest_path)); + } + new_manifests_.clear(); + } + + // Write new manifests if there are new data files + if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) { + for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id)); + std::vector> files; + files.reserve(data_files.size()); + std::ranges::copy(data_files, std::back_inserter(files)); + ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec)); + new_manifests_.insert(new_manifests_.end(), written_manifests.begin(), + written_manifests.end()); + } + has_new_files_ = false; + } + + return new_manifests_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h new file mode 100644 index 000000000..f49c059ca --- /dev/null +++ b/src/iceberg/update/fast_append.h @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/fast_append.h + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/append_files.h" +#include "iceberg/update/snapshot_update.h" + +namespace iceberg { + +/// \brief Append implementation that adds new manifest files for writes. +/// +/// FastAppend is optimized for appending new data files to a table, it creates new +/// manifest files for the added data without compacting or rewriting existing manifests, +/// making it faster for write-heavy workloads. +class ICEBERG_EXPORT FastAppend : public SnapshotUpdate, public AppendFiles { + public: + /// \brief Create a new FastAppend instance. + /// + /// \param table_name The name of the table + /// \param transaction The transaction to use for this update + /// \return A Result containing the FastAppend instance or an error + static Result> Make( + std::string table_name, std::shared_ptr transaction); + + /// \brief Append a data file to this update. + /// + /// \param file The data file to append + /// \return Reference to this for method chaining + FastAppend& AppendFile(std::shared_ptr file) override; + + /// \brief Append a manifest file to this update. + /// + /// The manifest must only contain added files (no existing or deleted files). + /// If the manifest doesn't have a snapshot ID assigned and snapshot ID inheritance + /// is enabled, it will be used directly. Otherwise, it will be copied with the + /// new snapshot ID. + /// + /// \param manifest The manifest file to append + /// \return Reference to this for method chaining + FastAppend& AppendManifest(const ManifestFile& manifest) override; + + /// \brief Set the target branch for this update. + /// + /// \param branch The branch name + /// \return Reference to this for method chaining + FastAppend& ToBranch(const std::string& branch); + + /// \brief Set a summary property. + /// + /// \param property The property name + /// \param value The property value + /// \return Reference to this for method chaining + FastAppend& Set(const std::string& property, const std::string& value); + + Kind kind() const override { return Kind::kUpdateSnapshot; } + + std::string operation() override; + + Result> Apply( + const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) override; + std::unordered_map Summary() override; + void CleanUncommitted(const std::unordered_set& committed) override; + bool CleanupAfterCommit() const override; + + private: + explicit FastAppend(std::string table_name, std::shared_ptr transaction); + + /// \brief Get the partition spec by spec ID. + Result> Spec(int32_t spec_id); + + /// \brief Copy a manifest file with a new snapshot ID. + /// + /// \param manifest The manifest to copy + /// \return The copied manifest file + Result CopyManifest(const ManifestFile& manifest); + + /// \brief Write new manifests for the accumulated data files. + /// + /// \return A vector of manifest files, or an error + Result> WriteNewManifests(); + + private: + struct DataFilePtrHash { + size_t operator()(const std::shared_ptr& file) const { + if (!file) { + return 0; + } + return std::hash{}(file->file_path); + } + }; + + struct DataFilePtrEqual { + bool operator()(const std::shared_ptr& left, + const std::shared_ptr& right) const { + if (left == right) { + return true; + } + if (!left || !right) { + return false; + } + return left->file_path == right->file_path; + } + }; + + std::string table_name_; + SnapshotSummaryBuilder summary_; + std::unordered_map, + DataFilePtrHash, DataFilePtrEqual>> + new_data_files_by_spec_; + std::vector append_manifests_; + std::vector rewritten_append_manifests_; + std::vector new_manifests_; + bool has_new_files_{false}; + int32_t copy_manifest_count_{0}; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 3387fd11a..4ca406842 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'expire_snapshots.h', + 'fast_append.h', 'pending_update.h', 'snapshot_update.h', 'update_location.h', diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 48ef1676f..2bb3f59e6 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -167,6 +167,12 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Get or generate the snapshot ID for the new snapshot. int64_t SnapshotId(); + /// \brief Delete a file at the given path. + /// + /// \param path The path of the file to delete + /// \return A status indicating the result of the deletion + Status DeleteFile(const std::string& path); + private: /// \brief Returns the snapshot summary from the implementation and updates totals. Result> ComputeSummary( @@ -175,7 +181,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Clean up all uncommitted files void CleanAll(); - Status DeleteFile(const std::string& path); std::string ManifestListPath(); std::string ManifestPath(); From 7cfc0e9ace4c0f5b9e1734d325c1d6d40fd05786 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Mon, 19 Jan 2026 22:42:33 +0800 Subject: [PATCH 2/8] fix: review comments --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/constants.h | 1 + .../manifest/manifest_util_internal.cc | 75 +++++++++++++++ src/iceberg/manifest/manifest_util_internal.h | 57 +++++++++++ src/iceberg/meson.build | 1 + src/iceberg/table_metadata.h | 1 - .../test/manifest_writer_versions_test.cc | 30 +++--- src/iceberg/type_fwd.h | 2 +- src/iceberg/update/append_files.h | 70 -------------- src/iceberg/update/fast_append.cc | 94 +++++++------------ src/iceberg/update/fast_append.h | 39 +------- src/iceberg/update/snapshot_update.h | 7 +- src/iceberg/util/content_file_util.h | 29 ++++++ 13 files changed, 225 insertions(+), 182 deletions(-) create mode 100644 src/iceberg/manifest/manifest_util_internal.cc create mode 100644 src/iceberg/manifest/manifest_util_internal.h delete mode 100644 src/iceberg/update/append_files.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index e44d9cd76..b892bd7db 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -49,6 +49,7 @@ set(ICEBERG_SOURCES manifest/manifest_group.cc manifest/manifest_list.cc manifest/manifest_reader.cc + manifest/manifest_util_internal.cc manifest/manifest_writer.cc manifest/rolling_manifest_writer.cc manifest/v1_metadata.cc diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h index 89001f09c..1d5941626 100644 --- a/src/iceberg/constants.h +++ b/src/iceberg/constants.h @@ -32,6 +32,7 @@ namespace iceberg { constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; constexpr int64_t kInvalidSnapshotId = -1; +constexpr int64_t kInvalidSequenceNumber = -1; /// \brief Stand-in for the current sequence number that will be assigned when the commit /// is successful. This is replaced when writing a manifest list by the ManifestFile /// adapter. diff --git a/src/iceberg/manifest/manifest_util_internal.cc b/src/iceberg/manifest/manifest_util_internal.cc new file mode 100644 index 000000000..416bb2ac0 --- /dev/null +++ b/src/iceberg/manifest/manifest_util_internal.cc @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_util_internal.h" + +#include +#include + +#include "iceberg/inheritable_metadata.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result CopyAppendManifest( + const ManifestFile& manifest, std::shared_ptr file_io, + std::shared_ptr schema, std::shared_ptr spec, + int64_t snapshot_id, const std::string& output_path, int8_t format_version, + SnapshotSummaryBuilder* summary_builder) { + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io, schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + // use metadata that will add the current snapshot's ID for the rewrite + ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, + InheritableMetadataFactory::ForCopy(snapshot_id)); + + // do not produce row IDs for the copy + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestWriter::MakeWriter(format_version, snapshot_id, output_path, file_io, spec, + schema, ManifestContent::kData)); + + // Write all entries as added entries with the new snapshot ID + for (auto& entry : entries) { + ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded, + "Manifest to copy must only contain added entries"); + + ICEBERG_RETURN_UNEXPECTED(inheritable_metadata->Apply(entry)); + + if (summary_builder != nullptr && entry.data_file != nullptr) { + summary_builder->AddedFile(*spec, *entry.data_file); + } + + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } + + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile()); + + return new_manifest; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_util_internal.h b/src/iceberg/manifest/manifest_util_internal.h new file mode 100644 index 000000000..6db531b7f --- /dev/null +++ b/src/iceberg/manifest/manifest_util_internal.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_util_internal.h +/// Internal utility functions for manifest operations. + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Copy an append manifest with a new snapshot ID. +/// +/// This function copies a manifest file that contains only ADDED entries, +/// rewriting it with a new snapshot ID. This is similar to Java's +/// ManifestFiles.copyAppendManifest. +/// +/// \param manifest The manifest file to copy +/// \param file_io File IO implementation to use +/// \param schema Table schema +/// \param spec Partition spec for the manifest +/// \param snapshot_id The new snapshot ID to assign to entries +/// \param output_path Path where the new manifest will be written +/// \param format_version Table format version +/// \param summary_builder Optional summary builder to update with file metrics +/// \return The copied manifest file, or an error +ICEBERG_EXPORT Result CopyAppendManifest( + const ManifestFile& manifest, std::shared_ptr file_io, + std::shared_ptr schema, std::shared_ptr spec, + int64_t snapshot_id, const std::string& output_path, int8_t format_version, + SnapshotSummaryBuilder* summary_builder = nullptr); + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d35c15a7f..bd6062394 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -67,6 +67,7 @@ iceberg_sources = files( 'manifest/manifest_group.cc', 'manifest/manifest_list.cc', 'manifest/manifest_reader.cc', + 'manifest/manifest_util_internal.cc', 'manifest/manifest_writer.cc', 'manifest/rolling_manifest_writer.cc', 'manifest/v1_metadata.cc', diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index a4165b814..6631b9d2d 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata { static constexpr int8_t kMinFormatVersionRowLineage = 3; static constexpr int8_t kMinFormatVersionDefaultValues = 3; static constexpr int64_t kInitialSequenceNumber = 0; - static constexpr int64_t kInvalidSequenceNumber = -1; static constexpr int64_t kInitialRowId = 0; static inline const std::unordered_map kMinFormatVersions = {}; diff --git a/src/iceberg/test/manifest_writer_versions_test.cc b/src/iceberg/test/manifest_writer_versions_test.cc index 70d00504a..cc4e804be 100644 --- a/src/iceberg/test/manifest_writer_versions_test.cc +++ b/src/iceberg/test/manifest_writer_versions_test.cc @@ -27,6 +27,7 @@ #include "iceberg/arrow/arrow_file_io.h" #include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" #include "iceberg/file_format.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" @@ -411,12 +412,11 @@ class ManifestWriterVersionsTest : public ::testing::Test { TEST_F(ManifestWriterVersionsTest, TestV1Write) { auto manifest = WriteManifest(/*format_version=*/1, {data_file_}); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); - CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData); + CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber, + DataFile::Content::kData); } TEST_F(ManifestWriterVersionsTest, TestV1WriteDelete) { @@ -449,13 +449,12 @@ TEST_F(ManifestWriterVersionsTest, TestV1WriteWithInheritance) { TEST_F(ManifestWriterVersionsTest, TestV2Write) { auto manifest = WriteManifest(/*format_version=*/2, {data_file_}); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); ASSERT_EQ(manifest.content, ManifestContent::kData); - CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData); + CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber, + DataFile::Content::kData); } TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) { @@ -470,8 +469,7 @@ TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) { TEST_F(ManifestWriterVersionsTest, TestV2PlusWriteDeleteV2) { auto manifest = WriteDeleteManifest(/*format_version=*/2, delete_file_); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); ASSERT_EQ(manifest.content, ManifestContent::kDeletes); @@ -507,7 +505,7 @@ TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) { // rewrite the manifest file using a v2 manifest auto rewritten_manifest = RewriteManifest(manifests[0], 2); - CheckRewrittenManifest(rewritten_manifest, TableMetadata::kInvalidSequenceNumber, + CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber, TableMetadata::kInitialSequenceNumber); // add the v2 manifest to a v2 manifest list, with a sequence number @@ -525,14 +523,12 @@ TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) { TEST_F(ManifestWriterVersionsTest, TestV3Write) { auto manifest = WriteManifest(/*format_version=*/3, {data_file_}); - CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber); + CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber); auto entries = ReadManifest(manifest); ASSERT_EQ(entries.size(), 1); ASSERT_EQ(manifest.content, ManifestContent::kData); - CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber, - TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData, - ManifestStatus::kAdded, kFirstRowId); + CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber, + DataFile::Content::kData, ManifestStatus::kAdded, kFirstRowId); } TEST_F(ManifestWriterVersionsTest, TestV3WriteWithInheritance) { @@ -598,7 +594,7 @@ TEST_F(ManifestWriterVersionsTest, TestV3ManifestRewriteWithInheritance) { // rewrite the manifest file using a v3 manifest auto rewritten_manifest = RewriteManifest(manifests[0], 3); - CheckRewrittenManifest(rewritten_manifest, TableMetadata::kInvalidSequenceNumber, + CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber, TableMetadata::kInitialSequenceNumber); // add the v3 manifest to a v3 manifest list, with a sequence number diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 7706dfd7a..b8df04b1b 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -120,6 +120,7 @@ struct SnapshotLogEntry; struct SnapshotRef; struct StatisticsFile; struct TableMetadata; +class SnapshotSummaryBuilder; /// \brief Expression. class BoundPredicate; @@ -187,7 +188,6 @@ class TableUpdateContext; class Transaction; /// \brief Update family. -class AppendFiles; class ExpireSnapshots; class FastAppend; class PendingUpdate; diff --git a/src/iceberg/update/append_files.h b/src/iceberg/update/append_files.h deleted file mode 100644 index 2c255a6b9..000000000 --- a/src/iceberg/update/append_files.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include - -#include "iceberg/iceberg_export.h" -#include "iceberg/type_fwd.h" - -namespace iceberg { - -/// \brief API for appending new files in a table. -/// -/// This API accumulates file additions, produces a new Snapshot of the table, and commits -/// that snapshot as the current. -/// -/// When committing, these changes will be applied to the latest table snapshot. Commit -/// conflicts will be resolved by applying the changes to the new latest snapshot and -/// reattempting the commit. -class ICEBERG_EXPORT AppendFiles { - public: - virtual ~AppendFiles() = default; - - /// \brief Append a DataFile to the table. - /// - /// \param file A data file - /// \return Reference to this for method chaining - virtual AppendFiles& AppendFile(std::shared_ptr file) = 0; - - /// \brief Append a ManifestFile to the table. - /// - /// The manifest must contain only appended files. All files in the manifest will be - /// appended to the table in the snapshot created by this update. - /// - /// The manifest will be used directly if snapshot ID inheritance is enabled (all tables - /// with the format version > 1 or if the inheritance is enabled explicitly via table - /// properties). Otherwise, the manifest will be rewritten to assign all entries this - /// update's snapshot ID. - /// - /// If the manifest is rewritten, it is always the responsibility of the caller to - /// manage the lifecycle of the original manifest. If the manifest is used directly, it - /// should never be deleted manually if the commit succeeds as it will become part of - /// the table metadata and will be cleaned upon expiry. If the manifest gets merged with - /// others while preparing a new snapshot, it will be deleted automatically if this - /// operation is successful. If the commit fails, the manifest will never be deleted, - /// and it is up to the caller whether to delete or reuse it. - /// - /// \param file A manifest file - /// \return Reference to this for method chaining - virtual AppendFiles& AppendManifest(const ManifestFile& file) = 0; -}; - -} // namespace iceberg diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index 575d9c78a..2e8d5f21d 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -19,15 +19,13 @@ #include "iceberg/update/fast_append.h" -#include #include #include #include #include "iceberg/constants.h" #include "iceberg/manifest/manifest_entry.h" -#include "iceberg/manifest/manifest_reader.h" -#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/manifest_util_internal.h" #include "iceberg/snapshot.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" @@ -40,6 +38,9 @@ namespace iceberg { Result> FastAppend::Make( std::string table_name, std::shared_ptr transaction) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create FastAppend without a transaction"); return std::unique_ptr( new FastAppend(std::move(table_name), std::move(transaction))); } @@ -72,16 +73,16 @@ FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) { "Cannot append manifest with deleted files"); ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId, "Snapshot id must be assigned during commit"); - ICEBERG_BUILDER_CHECK(manifest.sequence_number == TableMetadata::kInvalidSequenceNumber, + ICEBERG_BUILDER_CHECK(manifest.sequence_number == kInvalidSequenceNumber, "Sequence number must be assigned during commit"); - if (can_inherit_snapshot_id() && (manifest.added_snapshot_id == kInvalidSnapshotId)) { + if (can_inherit_snapshot_id() && manifest.added_snapshot_id == kInvalidSnapshotId) { summary_.AddedManifest(manifest); append_manifests_.push_back(manifest); } else { // The manifest must be rewritten with this update's snapshot ID ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest)); - rewritten_append_manifests_.push_back(copied_manifest); + rewritten_append_manifests_.push_back(std::move(copied_manifest)); } return *this; @@ -104,33 +105,32 @@ Result> FastAppend::Apply( std::vector manifests; ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests()); + manifests.reserve(new_written_manifests.size() + append_manifests_.size() + + rewritten_append_manifests_.size()); if (!new_written_manifests.empty()) { - manifests.insert(manifests.end(), new_written_manifests.begin(), - new_written_manifests.end()); + manifests.insert(manifests.end(), + std::make_move_iterator(new_written_manifests.begin()), + std::make_move_iterator(new_written_manifests.end())); } // Transform append manifests and rewritten append manifests with snapshot ID int64_t snapshot_id = SnapshotId(); - for (const auto& manifest : append_manifests_) { - ManifestFile updated = manifest; - updated.added_snapshot_id = snapshot_id; - manifests.push_back(updated); + for (auto& manifest : append_manifests_) { + manifest.added_snapshot_id = snapshot_id; } - - for (const auto& manifest : rewritten_append_manifests_) { - ManifestFile updated = manifest; - updated.added_snapshot_id = snapshot_id; - manifests.push_back(updated); + for (auto& manifest : rewritten_append_manifests_) { + manifest.added_snapshot_id = snapshot_id; } + manifests.insert(manifests.end(), append_manifests_.begin(), append_manifests_.end()); + manifests.insert(manifests.end(), rewritten_append_manifests_.begin(), + rewritten_append_manifests_.end()); // Add all manifests from the snapshot if (snapshot != nullptr) { // Use SnapshotCache to get manifests, similar to snapshot_update.cc auto cached_snapshot = SnapshotCache(snapshot.get()); - ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests_span, + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, cached_snapshot.Manifests(transaction_->table()->io())); - std::vector snapshot_manifests(snapshot_manifests_span.begin(), - snapshot_manifests_span.end()); manifests.insert(manifests.end(), snapshot_manifests.begin(), snapshot_manifests.end()); } @@ -148,19 +148,19 @@ void FastAppend::CleanUncommitted(const std::unordered_set& committ // Clean up new manifests that were written but not committed if (!new_manifests_.empty()) { for (const auto& manifest : new_manifests_) { - if (committed.find(manifest.manifest_path) == committed.end()) { + if (!committed.contains(manifest.manifest_path)) { std::ignore = DeleteFile(manifest.manifest_path); } } new_manifests_.clear(); } - // Clean up only rewritten_append_manifests as they are always owned by the table - // Don't clean up append_manifests as they are added to the manifest list and are + // Clean up only rewritten append manifests as they are always owned by the table + // Don't clean up append manifests as they are added to the manifest list and are // not compacted if (!rewritten_append_manifests_.empty()) { for (const auto& manifest : rewritten_append_manifests_) { - if (committed.find(manifest.manifest_path) == committed.end()) { + if (!committed.contains(manifest.manifest_path)) { std::ignore = DeleteFile(manifest.manifest_path); } } @@ -169,10 +169,10 @@ void FastAppend::CleanUncommitted(const std::unordered_set& committ bool FastAppend::CleanupAfterCommit() const { // Cleanup after committing is disabled for FastAppend unless there are - // rewritten_append_manifests because: + // rewritten_append_manifests_ because: // 1.) Appended manifests are never rewritten - // 2.) Manifests which are written out as part of appendFile are already cleaned - // up between commit attempts in writeNewManifests + // 2.) Manifests which are written out as part of AppendFile are already cleaned + // up between commit attempts in WriteNewManifests return !rewritten_append_manifests_.empty(); } @@ -186,44 +186,21 @@ Result FastAppend::CopyManifest(const ManifestFile& manifest) { ICEBERG_ASSIGN_OR_RAISE(auto spec, current.PartitionSpecById(manifest.partition_spec_id)); - // Read the manifest entries - ICEBERG_ASSIGN_OR_RAISE( - auto reader, - ManifestReader::Make(manifest, transaction_->table()->io(), schema, spec)); - ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); - - // Create a new manifest writer // Generate a unique manifest path using the transaction's metadata location - std::string filename = std::format("copy-m{}.avro", copy_manifest_count_++); - std::string new_manifest_path = transaction_->MetadataFileLocation(filename); + std::string new_manifest_path = ManifestPath(); int64_t snapshot_id = SnapshotId(); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, ManifestWriter::MakeWriter( - current.format_version, snapshot_id, new_manifest_path, - transaction_->table()->io(), spec, schema, ManifestContent::kData, - /*first_row_id=*/current.next_row_id)); - - // Write all entries as added entries with the new snapshot ID - for (auto& entry : entries) { - ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded, - "Manifest to copy must only contain added entries"); - entry.snapshot_id = snapshot_id; - ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); - } - - ICEBERG_RETURN_UNEXPECTED(writer->Close()); - ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile()); - - summary_.AddedManifest(new_manifest); - return new_manifest; + // Copy the manifest with the new snapshot ID. + return CopyAppendManifest(manifest, transaction_->table()->io(), schema, spec, + snapshot_id, new_manifest_path, current.format_version, + &summary_); } Result> FastAppend::WriteNewManifests() { // If there are new files and manifests were already written, clean them up if (has_new_files_ && !new_manifests_.empty()) { for (const auto& manifest : new_manifests_) { - ICEBERG_RETURN_UNEXPECTED(DeleteFile(manifest.manifest_path)); + std::ignore = DeleteFile(manifest.manifest_path); } new_manifests_.clear(); } @@ -236,8 +213,9 @@ Result> FastAppend::WriteNewManifests() { files.reserve(data_files.size()); std::ranges::copy(data_files, std::back_inserter(files)); ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec)); - new_manifests_.insert(new_manifests_.end(), written_manifests.begin(), - written_manifests.end()); + new_manifests_.insert(new_manifests_.end(), + std::make_move_iterator(written_manifests.begin()), + std::make_move_iterator(written_manifests.end())); } has_new_files_ = false; } diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index f49c059ca..094bfff61 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -21,11 +21,9 @@ /// \file iceberg/update/fast_append.h -#include #include #include #include -#include #include #include "iceberg/iceberg_export.h" @@ -34,8 +32,8 @@ #include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" -#include "iceberg/update/append_files.h" #include "iceberg/update/snapshot_update.h" +#include "iceberg/util/content_file_util.h" namespace iceberg { @@ -44,7 +42,7 @@ namespace iceberg { /// FastAppend is optimized for appending new data files to a table, it creates new /// manifest files for the added data without compacting or rewriting existing manifests, /// making it faster for write-heavy workloads. -class ICEBERG_EXPORT FastAppend : public SnapshotUpdate, public AppendFiles { +class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { public: /// \brief Create a new FastAppend instance. /// @@ -58,7 +56,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate, public AppendFiles { /// /// \param file The data file to append /// \return Reference to this for method chaining - FastAppend& AppendFile(std::shared_ptr file) override; + FastAppend& AppendFile(std::shared_ptr file); /// \brief Append a manifest file to this update. /// @@ -69,7 +67,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate, public AppendFiles { /// /// \param manifest The manifest file to append /// \return Reference to this for method chaining - FastAppend& AppendManifest(const ManifestFile& manifest) override; + FastAppend& AppendManifest(const ManifestFile& manifest); /// \brief Set the target branch for this update. /// @@ -84,8 +82,6 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate, public AppendFiles { /// \return Reference to this for method chaining FastAppend& Set(const std::string& property, const std::string& value); - Kind kind() const override { return Kind::kUpdateSnapshot; } - std::string operation() override; Result> Apply( @@ -113,38 +109,13 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate, public AppendFiles { Result> WriteNewManifests(); private: - struct DataFilePtrHash { - size_t operator()(const std::shared_ptr& file) const { - if (!file) { - return 0; - } - return std::hash{}(file->file_path); - } - }; - - struct DataFilePtrEqual { - bool operator()(const std::shared_ptr& left, - const std::shared_ptr& right) const { - if (left == right) { - return true; - } - if (!left || !right) { - return false; - } - return left->file_path == right->file_path; - } - }; - std::string table_name_; SnapshotSummaryBuilder summary_; - std::unordered_map, - DataFilePtrHash, DataFilePtrEqual>> - new_data_files_by_spec_; + std::unordered_map new_data_files_by_spec_; std::vector append_manifests_; std::vector rewritten_append_manifests_; std::vector new_manifests_; bool has_new_files_{false}; - int32_t copy_manifest_count_{0}; }; } // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 2bb3f59e6..4b9d60bf4 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -51,6 +51,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { ~SnapshotUpdate() override; + Kind kind() const override { return Kind::kUpdateSnapshot; } + /// \brief Set a callback to delete files instead of the table's default. /// /// \param delete_func A function used to delete file locations @@ -95,6 +97,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \param spec The partition spec to use /// \param data_sequence_number Optional data sequence number for the files /// \return A vector of manifest files + /// TODO(xxx): Change signature to accept iterator begin/end instead of vector to avoid + /// intermediate vector allocations (e.g., from DataFileSet) Result> WriteDataManifests( const std::vector>& data_files, const std::shared_ptr& spec, @@ -173,6 +177,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \return A status indicating the result of the deletion Status DeleteFile(const std::string& path); + std::string ManifestPath(); + private: /// \brief Returns the snapshot summary from the implementation and updates totals. Result> ComputeSummary( @@ -182,7 +188,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { void CleanAll(); std::string ManifestListPath(); - std::string ManifestPath(); private: const bool can_inherit_snapshot_id_{true}; diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index e173a41aa..912055f5b 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -22,6 +22,7 @@ /// \file iceberg/util/content_file_util.h /// Utility functions for content files (data files and delete files). +#include #include #include #include @@ -35,6 +36,34 @@ namespace iceberg { +/// \brief Hash functor for std::shared_ptr based on file path. +struct ICEBERG_EXPORT DataFilePtrHash { + size_t operator()(const std::shared_ptr& file) const { + if (!file) { + return 0; + } + return std::hash{}(file->file_path); + } +}; + +/// \brief Equality functor for std::shared_ptr based on file path. +struct ICEBERG_EXPORT DataFilePtrEqual { + bool operator()(const std::shared_ptr& left, + const std::shared_ptr& right) const { + if (left == right) { + return true; + } + if (!left || !right) { + return false; + } + return left->file_path == right->file_path; + } +}; + +/// \brief A set of DataFile pointers, deduplicated by file path. +using DataFileSet = + std::unordered_set, DataFilePtrHash, DataFilePtrEqual>; + /// \brief Utility functions for content files. struct ICEBERG_EXPORT ContentFileUtil { /// \brief Check if a delete file is a deletion vector (DV). From 610cf8068cd0a4efb56b44bd016301908df51790 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 20 Jan 2026 00:23:04 +0800 Subject: [PATCH 3/8] Make DataFileSet preserve insertion order for v3 row ID assignment Change DataFileSet from std::unordered_set to a custom class that preserves insertion order, similar to Java's DataFileSet which uses LinkedHashSet. This is important for row ID assignment in v3 manifests, where row IDs are assigned based on the order files are written. The implementation uses both a vector (for insertion order) and an unordered_set (for O(1) duplicate detection) to maintain the same API while preserving order. --- src/iceberg/util/content_file_util.h | 85 +++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 2 deletions(-) diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index 912055f5b..7ba04eb75 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -22,12 +22,14 @@ /// \file iceberg/util/content_file_util.h /// Utility functions for content files (data files and delete files). +#include #include #include #include #include #include #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/manifest/manifest_entry.h" @@ -61,8 +63,87 @@ struct ICEBERG_EXPORT DataFilePtrEqual { }; /// \brief A set of DataFile pointers, deduplicated by file path. -using DataFileSet = - std::unordered_set, DataFilePtrHash, DataFilePtrEqual>; +/// +/// This preserves insertion order, which is important for row ID assignment in v3 +/// manifests. Similar to Java's DataFileSet which uses LinkedHashSet to maintain +/// insertion order. +class ICEBERG_EXPORT DataFileSet { + public: + using value_type = std::shared_ptr; + using iterator = typename std::vector::iterator; + using const_iterator = typename std::vector::const_iterator; + + DataFileSet() = default; + + /// \brief Insert a data file into the set. + /// \param file The data file to insert + /// \return A pair with an iterator to the inserted element (or the existing one) and + /// a bool indicating whether insertion took place + std::pair insert(const value_type& file) { + if (!file) { + return {elements_.end(), false}; + } + // Check if file already exists using the hash set for O(1) lookup + auto [hash_iter, hash_inserted] = hash_set_.insert(file); + if (!hash_inserted) { + // File already exists, find it in the vector using the element from hash_set_ + const auto& existing_file = *hash_iter; + auto vec_iter = std::ranges::find_if(elements_, [&existing_file](const auto& elem) { + return DataFilePtrEqual{}(elem, existing_file); + }); + return {vec_iter, false}; + } + elements_.push_back(*hash_iter); + return {std::prev(elements_.end()), true}; + } + + /// \brief Insert a data file into the set (move version). + std::pair insert(value_type&& file) { + if (!file) { + return {elements_.end(), false}; + } + // Check if file already exists + auto [hash_iter, hash_inserted] = hash_set_.insert(file); + if (!hash_inserted) { + // File already exists, find it in the vector using the element from hash_set_ + const auto& existing_file = *hash_iter; + auto vec_iter = std::ranges::find_if(elements_, [&existing_file](const auto& elem) { + return DataFilePtrEqual{}(elem, existing_file); + }); + return {vec_iter, false}; + } + elements_.push_back(*hash_iter); + return {std::prev(elements_.end()), true}; + } + + /// \brief Get the number of elements in the set. + size_t size() const { return elements_.size(); } + + /// \brief Check if the set is empty. + bool empty() const { return elements_.empty(); } + + /// \brief Clear all elements from the set. + void clear() { + elements_.clear(); + hash_set_.clear(); + } + + /// \brief Get iterator to the beginning. + iterator begin() { return elements_.begin(); } + const_iterator begin() const { return elements_.begin(); } + const_iterator cbegin() const { return elements_.cbegin(); } + + /// \brief Get iterator to the end. + iterator end() { return elements_.end(); } + const_iterator end() const { return elements_.end(); } + const_iterator cend() const { return elements_.cend(); } + + private: + // Vector to preserve insertion order + std::vector elements_; + // Hash set for O(1) duplicate detection + std::unordered_set hash_set_; +}; /// \brief Utility functions for content files. struct ICEBERG_EXPORT ContentFileUtil { From 004a162cd0f79d45f58541b5a93d7a6d57a126f3 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 20 Jan 2026 00:36:59 +0800 Subject: [PATCH 4/8] fix: windows build error --- src/iceberg/manifest/manifest_util_internal.cc | 2 +- src/iceberg/manifest/manifest_util_internal.h | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/iceberg/manifest/manifest_util_internal.cc b/src/iceberg/manifest/manifest_util_internal.cc index 416bb2ac0..3a611262b 100644 --- a/src/iceberg/manifest/manifest_util_internal.cc +++ b/src/iceberg/manifest/manifest_util_internal.cc @@ -60,7 +60,7 @@ Result CopyAppendManifest( ICEBERG_RETURN_UNEXPECTED(inheritable_metadata->Apply(entry)); if (summary_builder != nullptr && entry.data_file != nullptr) { - summary_builder->AddedFile(*spec, *entry.data_file); + ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file)); } ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); diff --git a/src/iceberg/manifest/manifest_util_internal.h b/src/iceberg/manifest/manifest_util_internal.h index 6db531b7f..25606912e 100644 --- a/src/iceberg/manifest/manifest_util_internal.h +++ b/src/iceberg/manifest/manifest_util_internal.h @@ -24,7 +24,6 @@ #include #include -#include #include #include "iceberg/iceberg_export.h" From 310fae734205b352de85d4a5dd083326294a027c Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 20 Jan 2026 12:56:11 +0800 Subject: [PATCH 5/8] fix: move Set to SnapshotUpdate --- src/iceberg/update/fast_append.cc | 5 ----- src/iceberg/update/fast_append.h | 8 -------- src/iceberg/update/snapshot_update.h | 14 ++++++++++++++ 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index 2e8d5f21d..e4d65c073 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -93,11 +93,6 @@ FastAppend& FastAppend::ToBranch(const std::string& branch) { return *this; } -FastAppend& FastAppend::Set(const std::string& property, const std::string& value) { - summary_.Set(property, value); - return *this; -} - std::string FastAppend::operation() { return DataOperation::kAppend; } Result> FastAppend::Apply( diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index 094bfff61..c0dbfc649 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -75,13 +75,6 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { /// \return Reference to this for method chaining FastAppend& ToBranch(const std::string& branch); - /// \brief Set a summary property. - /// - /// \param property The property name - /// \param value The property value - /// \return Reference to this for method chaining - FastAppend& Set(const std::string& property, const std::string& value); - std::string operation() override; Result> Apply( @@ -110,7 +103,6 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { private: std::string table_name_; - SnapshotSummaryBuilder summary_; std::unordered_map new_data_files_by_spec_; std::vector append_manifests_; std::vector rewritten_append_manifests_; diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 4b9d60bf4..c3d7eb452 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -76,6 +76,16 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Set a summary property. + /// + /// \param property The property name + /// \param value The property value + /// \return Reference to this for method chaining + auto& Set(this auto& self, const std::string& property, const std::string& value) { + self.summary_.Set(property, value); + return self; + } + /// \brief Apply the update's changes to create a new snapshot. /// /// This method validates the changes, applies them to the metadata, @@ -179,6 +189,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::string ManifestPath(); + protected: + /// \brief Builder for tracking snapshot summary properties and metrics. + SnapshotSummaryBuilder summary_; + private: /// \brief Returns the snapshot summary from the implementation and updates totals. Result> ComputeSummary( From 7118dcc7462f4277bba50d7ffad83cf88c905230 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 20 Jan 2026 14:12:34 +0800 Subject: [PATCH 6/8] fix: more review comments --- src/iceberg/CMakeLists.txt | 2 +- src/iceberg/manifest/manifest_reader.cc | 18 ++++ src/iceberg/manifest/manifest_reader.h | 16 ++++ ...fest_util_internal.cc => manifest_util.cc} | 34 +++----- src/iceberg/manifest/manifest_util_internal.h | 4 +- src/iceberg/meson.build | 2 +- src/iceberg/test/fast_append_test.cc | 22 ++--- src/iceberg/test/update_test_base.h | 17 +++- src/iceberg/type_fwd.h | 1 + src/iceberg/update/fast_append.cc | 8 +- src/iceberg/update/fast_append.h | 13 +-- src/iceberg/update/snapshot_update.h | 3 +- src/iceberg/util/content_file_util.h | 87 +++++-------------- 13 files changed, 102 insertions(+), 125 deletions(-) rename src/iceberg/manifest/{manifest_util_internal.cc => manifest_util.cc} (71%) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index b892bd7db..9ff802a17 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -49,7 +49,7 @@ set(ICEBERG_SOURCES manifest/manifest_group.cc manifest/manifest_list.cc manifest/manifest_reader.cc - manifest/manifest_util_internal.cc + manifest/manifest_util.cc manifest/manifest_writer.cc manifest/rolling_manifest_writer.cc manifest/v1_metadata.cc diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 693b9fc55..bdea0209e 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -32,6 +32,7 @@ #include "iceberg/expression/expression.h" #include "iceberg/expression/projections.h" #include "iceberg/file_format.h" +#include "iceberg/inheritable_metadata.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader_internal.h" @@ -1012,6 +1013,23 @@ Result> ManifestReader::Make( std::move(spec), std::move(inheritable_metadata), std::nullopt); } +Result> ManifestReader::Make( + const ManifestFile& manifest, std::shared_ptr file_io, + std::shared_ptr schema, std::shared_ptr spec, + std::unique_ptr inheritable_metadata, + std::optional first_row_id) { + if (file_io == nullptr || schema == nullptr || spec == nullptr || + inheritable_metadata == nullptr) { + return InvalidArgument( + "FileIO, Schema, PartitionSpec, and InheritableMetadata cannot be null to create " + "ManifestReader"); + } + + return std::make_unique( + manifest.manifest_path, manifest.manifest_length, std::move(file_io), + std::move(schema), std::move(spec), std::move(inheritable_metadata), first_row_id); +} + Result> ManifestListReader::Make( std::string_view manifest_list_location, std::shared_ptr file_io) { std::shared_ptr schema = ManifestFile::Type()->ToSchema(); diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index ddfefc57d..1c54ec4f7 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -100,6 +100,22 @@ class ICEBERG_EXPORT ManifestReader { std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr schema, std::shared_ptr spec); + /// \brief Creates a reader for a manifest file with explicit inheritable metadata. + /// \param manifest A ManifestFile object containing metadata about the manifest. + /// \param file_io File IO implementation to use. + /// \param schema Schema used to bind the partition type. + /// \param spec Partition spec used for this manifest file. + /// \param inheritable_metadata Inheritable metadata to use (instead of extracting from + /// manifest). + /// \param first_row_id First row ID to use (nullopt to clear first_row_id from + /// entries). + /// \return A Result containing the reader or an error. + static Result> Make( + const ManifestFile& manifest, std::shared_ptr file_io, + std::shared_ptr schema, std::shared_ptr spec, + std::unique_ptr inheritable_metadata, + std::optional first_row_id); + /// \brief Add stats columns to the column list if needed. static std::vector WithStatsColumns( const std::vector& columns); diff --git a/src/iceberg/manifest/manifest_util_internal.cc b/src/iceberg/manifest/manifest_util.cc similarity index 71% rename from src/iceberg/manifest/manifest_util_internal.cc rename to src/iceberg/manifest/manifest_util.cc index 3a611262b..b4b27830e 100644 --- a/src/iceberg/manifest/manifest_util_internal.cc +++ b/src/iceberg/manifest/manifest_util.cc @@ -17,14 +17,13 @@ * under the License. */ -#include "iceberg/manifest/manifest_util_internal.h" - #include #include #include "iceberg/inheritable_metadata.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_util_internal.h" #include "iceberg/manifest/manifest_writer.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -34,31 +33,28 @@ namespace iceberg { Result CopyAppendManifest( - const ManifestFile& manifest, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec, + const ManifestFile& manifest, const std::shared_ptr& file_io, + const std::shared_ptr& schema, const std::shared_ptr& spec, int64_t snapshot_id, const std::string& output_path, int8_t format_version, SnapshotSummaryBuilder* summary_builder) { - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, file_io, schema, spec)); - ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); - // use metadata that will add the current snapshot's ID for the rewrite + // read first_row_id as null because this copies the incoming manifest before commit ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::ForCopy(snapshot_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ManifestReader::Make(manifest, file_io, schema, spec, + std::move(inheritable_metadata), std::nullopt)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); // do not produce row IDs for the copy ICEBERG_ASSIGN_OR_RAISE( - auto writer, - ManifestWriter::MakeWriter(format_version, snapshot_id, output_path, file_io, spec, - schema, ManifestContent::kData)); + auto writer, ManifestWriter::MakeWriter( + format_version, snapshot_id, output_path, file_io, spec, schema, + ManifestContent::kData, /*first_row_id*/ std::nullopt)); - // Write all entries as added entries with the new snapshot ID for (auto& entry : entries) { - ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded, - "Manifest to copy must only contain added entries"); - - ICEBERG_RETURN_UNEXPECTED(inheritable_metadata->Apply(entry)); - + ICEBERG_CHECK(entry.status == ManifestStatus::kAdded, + "Manifest to copy must only contain added entries"); if (summary_builder != nullptr && entry.data_file != nullptr) { ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file)); } @@ -67,9 +63,7 @@ Result CopyAppendManifest( } ICEBERG_RETURN_UNEXPECTED(writer->Close()); - ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile()); - - return new_manifest; + return writer->ToManifestFile(); } } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_util_internal.h b/src/iceberg/manifest/manifest_util_internal.h index 25606912e..e68437d08 100644 --- a/src/iceberg/manifest/manifest_util_internal.h +++ b/src/iceberg/manifest/manifest_util_internal.h @@ -48,8 +48,8 @@ namespace iceberg { /// \param summary_builder Optional summary builder to update with file metrics /// \return The copied manifest file, or an error ICEBERG_EXPORT Result CopyAppendManifest( - const ManifestFile& manifest, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec, + const ManifestFile& manifest, const std::shared_ptr& file_io, + const std::shared_ptr& schema, const std::shared_ptr& spec, int64_t snapshot_id, const std::string& output_path, int8_t format_version, SnapshotSummaryBuilder* summary_builder = nullptr); diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index bd6062394..febe39c9a 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -67,7 +67,7 @@ iceberg_sources = files( 'manifest/manifest_group.cc', 'manifest/manifest_list.cc', 'manifest/manifest_reader.cc', - 'manifest/manifest_util_internal.cc', + 'manifest/manifest_util.cc', 'manifest/manifest_writer.cc', 'manifest/rolling_manifest_writer.cc', 'manifest/v1_metadata.cc', diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 729ac97ee..7c79d5e9e 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -40,27 +40,19 @@ class FastAppendTest : public UpdateTestBase { static void SetUpTestSuite() { avro::RegisterAll(); } void SetUp() override { - UpdateTestBase::SetUp(); - - ASSERT_THAT(catalog_->DropTable(table_ident_, /*purge=*/false), IsOk()); - - auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", - table_location_, Uuid::GenerateV7().ToString()); - ICEBERG_UNWRAP_OR_FAIL( - auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json")); - metadata->location = table_location_; - ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), - IsOk()); - ICEBERG_UNWRAP_OR_FAIL(table_, - catalog_->RegisterTable(table_ident_, metadata_location)); + InitializeFileIO(); + // Use minimal metadata for FastAppend tests + RegisterTableFromResource("TableMetadataV2ValidMinimal.json"); // Get partition spec and schema from the base table ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); // Create test data files - file_a_ = CreateDataFile("/data/file_a.parquet", 100, 1024); - file_b_ = CreateDataFile("/data/file_b.parquet", 200, 2048); + file_a_ = + CreateDataFile("/data/file_a.parquet", /*size=*/100, /*partition_value=*/1024); + file_b_ = + CreateDataFile("/data/file_b.parquet", /*size=*/200, /*partition_value=*/2048); } std::shared_ptr CreateDataFile(const std::string& path, int64_t record_count, diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index c78dc4d0e..c14cb76b9 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -41,6 +41,12 @@ namespace iceberg { class UpdateTestBase : public ::testing::Test { protected: void SetUp() override { + InitializeFileIO(); + RegisterTableFromResource("TableMetadataV2Valid.json"); + } + + /// \brief Initialize file IO and create necessary directories. + void InitializeFileIO() { file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); catalog_ = InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", /*properties=*/{}); @@ -50,12 +56,19 @@ class UpdateTestBase : public ::testing::Test { static_cast(*file_io_).fs()); ASSERT_TRUE(arrow_fs != nullptr); ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + } + + /// \brief Register a table from a metadata resource file. + /// + /// \param resource_name The name of the metadata resource file + void RegisterTableFromResource(const std::string& resource_name) { + // Drop existing table if it exists + std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false); // Write table metadata to the table location. auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", table_location_, Uuid::GenerateV7().ToString()); - ICEBERG_UNWRAP_OR_FAIL(auto metadata, - ReadTableMetadataFromResource("TableMetadataV2Valid.json")); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name)); metadata->location = table_location_; ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), IsOk()); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index b8df04b1b..5bf03a00d 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -120,6 +120,7 @@ struct SnapshotLogEntry; struct SnapshotRef; struct StatisticsFile; struct TableMetadata; +class InheritableMetadata; class SnapshotSummaryBuilder; /// \brief Expression. diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index e4d65c073..c7f66f2fb 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -48,7 +48,7 @@ Result> FastAppend::Make( FastAppend::FastAppend(std::string table_name, std::shared_ptr transaction) : SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {} -FastAppend& FastAppend::AppendFile(std::shared_ptr file) { +FastAppend& FastAppend::AppendFile(const std::shared_ptr& file) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(), "Data file must have partition spec ID"); @@ -88,11 +88,6 @@ FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) { return *this; } -FastAppend& FastAppend::ToBranch(const std::string& branch) { - ICEBERG_BUILDER_RETURN_IF_ERROR(SetTargetBranch(branch)); - return *this; -} - std::string FastAppend::operation() { return DataOperation::kAppend; } Result> FastAppend::Apply( @@ -122,7 +117,6 @@ Result> FastAppend::Apply( // Add all manifests from the snapshot if (snapshot != nullptr) { - // Use SnapshotCache to get manifests, similar to snapshot_update.cc auto cached_snapshot = SnapshotCache(snapshot.get()); ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, cached_snapshot.Manifests(transaction_->table()->io())); diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index c0dbfc649..87887c74d 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -27,17 +27,14 @@ #include #include "iceberg/iceberg_export.h" -#include "iceberg/manifest/manifest_entry.h" -#include "iceberg/manifest/manifest_list.h" #include "iceberg/result.h" -#include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/util/content_file_util.h" namespace iceberg { -/// \brief Append implementation that adds new manifest files for writes. +/// \brief Appending new files in a table. /// /// FastAppend is optimized for appending new data files to a table, it creates new /// manifest files for the added data without compacting or rewriting existing manifests, @@ -56,7 +53,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { /// /// \param file The data file to append /// \return Reference to this for method chaining - FastAppend& AppendFile(std::shared_ptr file); + FastAppend& AppendFile(const std::shared_ptr& file); /// \brief Append a manifest file to this update. /// @@ -69,12 +66,6 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { /// \return Reference to this for method chaining FastAppend& AppendManifest(const ManifestFile& manifest); - /// \brief Set the target branch for this update. - /// - /// \param branch The branch name - /// \return Reference to this for method chaining - FastAppend& ToBranch(const std::string& branch); - std::string operation() override; Result> Apply( diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index c3d7eb452..ac1b5ce08 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -188,6 +188,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { Status DeleteFile(const std::string& path); std::string ManifestPath(); + std::string ManifestListPath(); protected: /// \brief Builder for tracking snapshot summary properties and metrics. @@ -201,8 +202,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Clean up all uncommitted files void CleanAll(); - std::string ManifestListPath(); - private: const bool can_inherit_snapshot_id_{true}; const std::string commit_uuid_; diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index 7ba04eb75..ddbc6ebb5 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -38,40 +38,14 @@ namespace iceberg { -/// \brief Hash functor for std::shared_ptr based on file path. -struct ICEBERG_EXPORT DataFilePtrHash { - size_t operator()(const std::shared_ptr& file) const { - if (!file) { - return 0; - } - return std::hash{}(file->file_path); - } -}; - -/// \brief Equality functor for std::shared_ptr based on file path. -struct ICEBERG_EXPORT DataFilePtrEqual { - bool operator()(const std::shared_ptr& left, - const std::shared_ptr& right) const { - if (left == right) { - return true; - } - if (!left || !right) { - return false; - } - return left->file_path == right->file_path; - } -}; - -/// \brief A set of DataFile pointers, deduplicated by file path. -/// -/// This preserves insertion order, which is important for row ID assignment in v3 -/// manifests. Similar to Java's DataFileSet which uses LinkedHashSet to maintain -/// insertion order. +/// \brief A set of DataFile pointers with insertion order preserved and deduplicated by +/// file path. class ICEBERG_EXPORT DataFileSet { public: using value_type = std::shared_ptr; using iterator = typename std::vector::iterator; using const_iterator = typename std::vector::const_iterator; + using difference_type = typename std::vector::difference_type; DataFileSet() = default; @@ -79,41 +53,11 @@ class ICEBERG_EXPORT DataFileSet { /// \param file The data file to insert /// \return A pair with an iterator to the inserted element (or the existing one) and /// a bool indicating whether insertion took place - std::pair insert(const value_type& file) { - if (!file) { - return {elements_.end(), false}; - } - // Check if file already exists using the hash set for O(1) lookup - auto [hash_iter, hash_inserted] = hash_set_.insert(file); - if (!hash_inserted) { - // File already exists, find it in the vector using the element from hash_set_ - const auto& existing_file = *hash_iter; - auto vec_iter = std::ranges::find_if(elements_, [&existing_file](const auto& elem) { - return DataFilePtrEqual{}(elem, existing_file); - }); - return {vec_iter, false}; - } - elements_.push_back(*hash_iter); - return {std::prev(elements_.end()), true}; - } + std::pair insert(const value_type& file) { return InsertImpl(file); } /// \brief Insert a data file into the set (move version). std::pair insert(value_type&& file) { - if (!file) { - return {elements_.end(), false}; - } - // Check if file already exists - auto [hash_iter, hash_inserted] = hash_set_.insert(file); - if (!hash_inserted) { - // File already exists, find it in the vector using the element from hash_set_ - const auto& existing_file = *hash_iter; - auto vec_iter = std::ranges::find_if(elements_, [&existing_file](const auto& elem) { - return DataFilePtrEqual{}(elem, existing_file); - }); - return {vec_iter, false}; - } - elements_.push_back(*hash_iter); - return {std::prev(elements_.end()), true}; + return InsertImpl(std::move(file)); } /// \brief Get the number of elements in the set. @@ -125,7 +69,7 @@ class ICEBERG_EXPORT DataFileSet { /// \brief Clear all elements from the set. void clear() { elements_.clear(); - hash_set_.clear(); + index_by_path_.clear(); } /// \brief Get iterator to the beginning. @@ -139,10 +83,25 @@ class ICEBERG_EXPORT DataFileSet { const_iterator cend() const { return elements_.cend(); } private: + std::pair InsertImpl(value_type file) { + if (!file) { + return {elements_.end(), false}; + } + + auto [index_iter, inserted] = + index_by_path_.try_emplace(file->file_path, elements_.size()); + if (!inserted) { + auto pos = static_cast(index_iter->second); + return {elements_.begin() + pos, false}; + } + + elements_.push_back(std::move(file)); + return {std::prev(elements_.end()), true}; + } + // Vector to preserve insertion order std::vector elements_; - // Hash set for O(1) duplicate detection - std::unordered_set hash_set_; + std::unordered_map index_by_path_; }; /// \brief Utility functions for content files. From 04fb4a40ac4d3365bafbb60cf94f39828da00cad Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 20 Jan 2026 15:40:06 +0800 Subject: [PATCH 7/8] fix: windows build failure --- src/iceberg/manifest/manifest_reader.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index 1c54ec4f7..6d5c60f9a 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -22,7 +22,9 @@ /// \file iceberg/manifest/manifest_reader.h /// Data reader interface for manifest files. +#include #include +#include #include #include #include From e2e216d52a36b635b662769368e949030fe0487e Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 20 Jan 2026 17:29:24 +0800 Subject: [PATCH 8/8] attempt to fix manifest reader --- src/iceberg/inheritable_metadata.cc | 8 +++--- src/iceberg/inheritable_metadata.h | 8 +++++- src/iceberg/manifest/manifest_reader.cc | 33 ++++++++----------------- src/iceberg/manifest/manifest_reader.h | 24 ++++++------------ src/iceberg/manifest/manifest_util.cc | 8 +++--- src/iceberg/update/snapshot_update.h | 8 +++--- src/iceberg/util/content_file_util.h | 2 -- 7 files changed, 38 insertions(+), 53 deletions(-) diff --git a/src/iceberg/inheritable_metadata.cc b/src/iceberg/inheritable_metadata.cc index 1d740b5c3..7ff2ddbcb 100644 --- a/src/iceberg/inheritable_metadata.cc +++ b/src/iceberg/inheritable_metadata.cc @@ -21,14 +21,16 @@ #include -#include - #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" -#include "iceberg/snapshot.h" namespace iceberg { +InheritableMetadata::~InheritableMetadata() = default; +BaseInheritableMetadata::~BaseInheritableMetadata() = default; +CopyInheritableMetadata::~CopyInheritableMetadata() = default; +EmptyInheritableMetadata::~EmptyInheritableMetadata() = default; + BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id, int64_t sequence_number, std::string manifest_location) diff --git a/src/iceberg/inheritable_metadata.h b/src/iceberg/inheritable_metadata.h index f06693a42..8b5ddadc7 100644 --- a/src/iceberg/inheritable_metadata.h +++ b/src/iceberg/inheritable_metadata.h @@ -39,7 +39,7 @@ namespace iceberg { /// from the manifest file. This interface provides a way to apply such inheritance rules. class ICEBERG_EXPORT InheritableMetadata { public: - virtual ~InheritableMetadata() = default; + virtual ~InheritableMetadata(); /// \brief Apply inheritable metadata to a manifest entry. /// \param entry The manifest entry to modify. @@ -61,6 +61,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata { Status Apply(ManifestEntry& entry) override; + ~BaseInheritableMetadata() override; + private: int32_t spec_id_; int64_t snapshot_id_; @@ -72,6 +74,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata { class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata { public: Status Apply(ManifestEntry& entry) override; + + ~EmptyInheritableMetadata() override; }; /// \brief Metadata inheritance for copying manifests before commit. @@ -83,6 +87,8 @@ class ICEBERG_EXPORT CopyInheritableMetadata : public InheritableMetadata { Status Apply(ManifestEntry& entry) override; + ~CopyInheritableMetadata() override; + private: int64_t snapshot_id_; }; diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index bdea0209e..53100b236 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -999,34 +999,21 @@ Result> ManifestReader::Make( } Result> ManifestReader::Make( - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec) { - if (file_io == nullptr || schema == nullptr || spec == nullptr) { - return InvalidArgument( - "FileIO, Schema, and PartitionSpec cannot be null to create ManifestReader"); - } - - // No metadata to inherit in this case. - ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty()); - return std::make_unique( - std::string(manifest_location), std::nullopt, std::move(file_io), std::move(schema), - std::move(spec), std::move(inheritable_metadata), std::nullopt); -} - -Result> ManifestReader::Make( - const ManifestFile& manifest, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec, + std::string_view manifest_location, std::optional manifest_length, + std::shared_ptr file_io, std::shared_ptr schema, + std::shared_ptr spec, std::unique_ptr inheritable_metadata, std::optional first_row_id) { - if (file_io == nullptr || schema == nullptr || spec == nullptr || - inheritable_metadata == nullptr) { - return InvalidArgument( - "FileIO, Schema, PartitionSpec, and InheritableMetadata cannot be null to create " - "ManifestReader"); + ICEBERG_PRECHECK(file_io != nullptr, "FileIO cannot be null to read manifest"); + ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null to read manifest"); + ICEBERG_PRECHECK(spec != nullptr, "PartitionSpec cannot be null to read manifest"); + + if (inheritable_metadata == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(inheritable_metadata, InheritableMetadataFactory::Empty()); } return std::make_unique( - manifest.manifest_path, manifest.manifest_length, std::move(file_io), + std::string(manifest_location), manifest_length, std::move(file_io), std::move(schema), std::move(spec), std::move(inheritable_metadata), first_row_id); } diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index 6d5c60f9a..1a1420216 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -94,29 +94,19 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Creates a reader for a manifest file. /// \param manifest_location Path to the manifest file. + /// \param manifest_length Length of the manifest file. /// \param file_io File IO implementation to use. /// \param schema Schema used to bind the partition type. /// \param spec Partition spec used for this manifest file. + /// \param inheritable_metadata Inheritable metadata. + /// \param first_row_id First row ID to use for the manifest entries. /// \return A Result containing the reader or an error. static Result> Make( - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec); - - /// \brief Creates a reader for a manifest file with explicit inheritable metadata. - /// \param manifest A ManifestFile object containing metadata about the manifest. - /// \param file_io File IO implementation to use. - /// \param schema Schema used to bind the partition type. - /// \param spec Partition spec used for this manifest file. - /// \param inheritable_metadata Inheritable metadata to use (instead of extracting from - /// manifest). - /// \param first_row_id First row ID to use (nullopt to clear first_row_id from - /// entries). - /// \return A Result containing the reader or an error. - static Result> Make( - const ManifestFile& manifest, std::shared_ptr file_io, - std::shared_ptr schema, std::shared_ptr spec, + std::string_view manifest_location, std::optional manifest_length, + std::shared_ptr file_io, std::shared_ptr schema, + std::shared_ptr spec, std::unique_ptr inheritable_metadata, - std::optional first_row_id); + std::optional first_row_id = std::nullopt); /// \brief Add stats columns to the column list if needed. static std::vector WithStatsColumns( diff --git a/src/iceberg/manifest/manifest_util.cc b/src/iceberg/manifest/manifest_util.cc index b4b27830e..12805452d 100644 --- a/src/iceberg/manifest/manifest_util.cc +++ b/src/iceberg/manifest/manifest_util.cc @@ -42,15 +42,17 @@ Result CopyAppendManifest( ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::ForCopy(snapshot_id)); ICEBERG_ASSIGN_OR_RAISE( - auto reader, ManifestReader::Make(manifest, file_io, schema, spec, - std::move(inheritable_metadata), std::nullopt)); + auto reader, + ManifestReader::Make(manifest.manifest_path, manifest.manifest_length, file_io, + schema, spec, std::move(inheritable_metadata), + /*first_row_id=*/std::nullopt)); ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); // do not produce row IDs for the copy ICEBERG_ASSIGN_OR_RAISE( auto writer, ManifestWriter::MakeWriter( format_version, snapshot_id, output_path, file_io, spec, schema, - ManifestContent::kData, /*first_row_id*/ std::nullopt)); + ManifestContent::kData, /*first_row_id=*/std::nullopt)); for (auto& entry : entries) { ICEBERG_CHECK(entry.status == ManifestStatus::kAdded, diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index ac1b5ce08..f31327fcd 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -189,10 +189,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::string ManifestPath(); std::string ManifestListPath(); - - protected: - /// \brief Builder for tracking snapshot summary properties and metrics. - SnapshotSummaryBuilder summary_; + SnapshotSummaryBuilder& summary_builder() { return summary_; } private: /// \brief Returns the snapshot summary from the implementation and updates totals. @@ -202,6 +199,9 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Clean up all uncommitted files void CleanAll(); + protected: + SnapshotSummaryBuilder summary_; + private: const bool can_inherit_snapshot_id_{true}; const std::string commit_uuid_; diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index ddbc6ebb5..95a8d6343 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -22,8 +22,6 @@ /// \file iceberg/util/content_file_util.h /// Utility functions for content files (data files and delete files). -#include -#include #include #include #include