Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ set(ICEBERG_SOURCES
manifest/manifest_group.cc
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_util.cc
manifest/manifest_writer.cc
manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
Expand Down Expand Up @@ -85,6 +86,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/expire_snapshots.cc
update/fast_append.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_location.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace iceberg {

constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
constexpr int64_t kInvalidSnapshotId = -1;
constexpr int64_t kInvalidSequenceNumber = -1;
/// \brief Stand-in for the current sequence number that will be assigned when the commit
/// is successful. This is replaced when writing a manifest list by the ManifestFile
/// adapter.
Expand Down
8 changes: 5 additions & 3 deletions src/iceberg/inheritable_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@

#include <utility>

#include <iceberg/result.h>

#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/snapshot.h"

namespace iceberg {

InheritableMetadata::~InheritableMetadata() = default;
BaseInheritableMetadata::~BaseInheritableMetadata() = default;
CopyInheritableMetadata::~CopyInheritableMetadata() = default;
EmptyInheritableMetadata::~EmptyInheritableMetadata() = default;

BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t snapshot_id,
int64_t sequence_number,
std::string manifest_location)
Expand Down
8 changes: 7 additions & 1 deletion src/iceberg/inheritable_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace iceberg {
/// from the manifest file. This interface provides a way to apply such inheritance rules.
class ICEBERG_EXPORT InheritableMetadata {
public:
virtual ~InheritableMetadata() = default;
virtual ~InheritableMetadata();

/// \brief Apply inheritable metadata to a manifest entry.
/// \param entry The manifest entry to modify.
Expand All @@ -61,6 +61,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata {

Status Apply(ManifestEntry& entry) override;

~BaseInheritableMetadata() override;

private:
int32_t spec_id_;
int64_t snapshot_id_;
Expand All @@ -72,6 +74,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public InheritableMetadata {
class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata {
public:
Status Apply(ManifestEntry& entry) override;

~EmptyInheritableMetadata() override;
};

/// \brief Metadata inheritance for copying manifests before commit.
Expand All @@ -83,6 +87,8 @@ class ICEBERG_EXPORT CopyInheritableMetadata : public InheritableMetadata {

Status Apply(ManifestEntry& entry) override;

~CopyInheritableMetadata() override;

private:
int64_t snapshot_id_;
};
Expand Down
23 changes: 14 additions & 9 deletions src/iceberg/manifest/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/expression/expression.h"
#include "iceberg/expression/projections.h"
#include "iceberg/file_format.h"
#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader_internal.h"
Expand Down Expand Up @@ -998,18 +999,22 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
}

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
if (file_io == nullptr || schema == nullptr || spec == nullptr) {
return InvalidArgument(
"FileIO, Schema, and PartitionSpec cannot be null to create ManifestReader");
std::string_view manifest_location, std::optional<int64_t> manifest_length,
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
std::shared_ptr<PartitionSpec> spec,
std::unique_ptr<InheritableMetadata> inheritable_metadata,
std::optional<int64_t> first_row_id) {
ICEBERG_PRECHECK(file_io != nullptr, "FileIO cannot be null to read manifest");
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null to read manifest");
ICEBERG_PRECHECK(spec != nullptr, "PartitionSpec cannot be null to read manifest");

if (inheritable_metadata == nullptr) {
ICEBERG_ASSIGN_OR_RAISE(inheritable_metadata, InheritableMetadataFactory::Empty());
}

// No metadata to inherit in this case.
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty());
return std::make_unique<ManifestReaderImpl>(
std::string(manifest_location), std::nullopt, std::move(file_io), std::move(schema),
std::move(spec), std::move(inheritable_metadata), std::nullopt);
std::string(manifest_location), manifest_length, std::move(file_io),
std::move(schema), std::move(spec), std::move(inheritable_metadata), first_row_id);
}

Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
Expand Down
12 changes: 10 additions & 2 deletions src/iceberg/manifest/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
/// \file iceberg/manifest/manifest_reader.h
/// Data reader interface for manifest files.

#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -92,13 +94,19 @@ class ICEBERG_EXPORT ManifestReader {

/// \brief Creates a reader for a manifest file.
/// \param manifest_location Path to the manifest file.
/// \param manifest_length Length of the manifest file.
/// \param file_io File IO implementation to use.
/// \param schema Schema used to bind the partition type.
/// \param spec Partition spec used for this manifest file.
/// \param inheritable_metadata Inheritable metadata.
/// \param first_row_id First row ID to use for the manifest entries.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
std::string_view manifest_location, std::optional<int64_t> manifest_length,
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
std::shared_ptr<PartitionSpec> spec,
std::unique_ptr<InheritableMetadata> inheritable_metadata,
std::optional<int64_t> first_row_id = std::nullopt);

/// \brief Add stats columns to the column list if needed.
static std::vector<std::string> WithStatsColumns(
Expand Down
71 changes: 71 additions & 0 deletions src/iceberg/manifest/manifest_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <memory>
#include <optional>

#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/manifest/manifest_util_internal.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Result<ManifestFile> CopyAppendManifest(
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
const std::shared_ptr<Schema>& schema, const std::shared_ptr<PartitionSpec>& spec,
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
SnapshotSummaryBuilder* summary_builder) {
// use metadata that will add the current snapshot's ID for the rewrite
// read first_row_id as null because this copies the incoming manifest before commit
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::ForCopy(snapshot_id));
ICEBERG_ASSIGN_OR_RAISE(
auto reader,
ManifestReader::Make(manifest.manifest_path, manifest.manifest_length, file_io,
schema, spec, std::move(inheritable_metadata),
/*first_row_id=*/std::nullopt));
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());

// do not produce row IDs for the copy
ICEBERG_ASSIGN_OR_RAISE(
auto writer, ManifestWriter::MakeWriter(
format_version, snapshot_id, output_path, file_io, spec, schema,
ManifestContent::kData, /*first_row_id=*/std::nullopt));

for (auto& entry : entries) {
ICEBERG_CHECK(entry.status == ManifestStatus::kAdded,
"Manifest to copy must only contain added entries");
if (summary_builder != nullptr && entry.data_file != nullptr) {
ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file));
}

ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
}

ICEBERG_RETURN_UNEXPECTED(writer->Close());
return writer->ToManifestFile();
}

} // namespace iceberg
56 changes: 56 additions & 0 deletions src/iceberg/manifest/manifest_util_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/manifest/manifest_util_internal.h
/// Internal utility functions for manifest operations.

#include <cstdint>
#include <memory>
#include <string>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Copy an append manifest with a new snapshot ID.
///
/// This function copies a manifest file that contains only ADDED entries,
/// rewriting it with a new snapshot ID. This is similar to Java's
/// ManifestFiles.copyAppendManifest.
///
/// \param manifest The manifest file to copy
/// \param file_io File IO implementation to use
/// \param schema Table schema
/// \param spec Partition spec for the manifest
/// \param snapshot_id The new snapshot ID to assign to entries
/// \param output_path Path where the new manifest will be written
/// \param format_version Table format version
/// \param summary_builder Optional summary builder to update with file metrics
/// \return The copied manifest file, or an error
ICEBERG_EXPORT Result<ManifestFile> CopyAppendManifest(
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
const std::shared_ptr<Schema>& schema, const std::shared_ptr<PartitionSpec>& spec,
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
SnapshotSummaryBuilder* summary_builder = nullptr);

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iceberg_sources = files(
'manifest/manifest_group.cc',
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_util.cc',
'manifest/manifest_writer.cc',
'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
Expand Down Expand Up @@ -103,6 +104,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/expire_snapshots.cc',
'update/fast_append.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
return transaction->NewUpdateLocation();
}

Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewFastAppend();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

/// \brief Create a new FastAppend to append data files and commit the changes.
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
1 change: 0 additions & 1 deletion src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int8_t kMinFormatVersionRowLineage = 3;
static constexpr int8_t kMinFormatVersionDefaultValues = 3;
static constexpr int64_t kInitialSequenceNumber = 0;
static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;

static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {};
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
Expand Down
Loading
Loading