Skip to content

feat: basic table scan planning #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(ICEBERG_SOURCES
sort_order.cc
statistics_file.cc
table_metadata.cc
table_scan.cc
transform.cc
transform_function.cc
type.cc
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ struct ICEBERG_EXPORT ManifestEntry {
std::optional<int64_t> file_sequence_number;
/// Field id: 2
/// File path, partition tuple, metrics, ...
DataFile data_file;
std::shared_ptr<DataFile> data_file;

inline static const SchemaField kStatus =
SchemaField::MakeRequired(0, "status", std::make_shared<IntType>());
Expand Down
18 changes: 18 additions & 0 deletions src/iceberg/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace iceberg {
/// \brief Read manifest entries from a manifest file.
class ICEBERG_EXPORT ManifestReader {
public:
virtual ~ManifestReader() = default;
virtual Result<std::span<std::unique_ptr<ManifestEntry>>> Entries() const = 0;

private:
Expand All @@ -43,10 +44,27 @@ class ICEBERG_EXPORT ManifestReader {
/// \brief Read manifest files from a manifest list file.
class ICEBERG_EXPORT ManifestListReader {
public:
virtual ~ManifestListReader() = default;
virtual Result<std::span<std::unique_ptr<ManifestFile>>> Files() const = 0;

private:
std::unique_ptr<StructLikeReader> reader_;
};

/// \brief Creates a reader for the manifest list.
/// \param file_path Path to the manifest list file.
/// \return A Result containing the reader or an error.
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader(
const std::string& file_path) {
return NotImplemented("CreateManifestListReader is not implemented yet.");
}

/// \brief Creates a reader for a manifest file.
/// \param file_path Path to the manifest file.
/// \return A Result containing the reader or an error.
Result<std::unique_ptr<ManifestReader>> CreateManifestReader(
const std::string& file_path) {
return NotImplemented("CreateManifestReader is not implemented yet.");
}

} // namespace iceberg
11 changes: 5 additions & 6 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,20 @@ class ICEBERG_EXPORT Table {
/// \return a vector of history entries
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;

/// \brief Create a new table scan for this table
/// \brief Create a new table scan builder for this table
///
/// Once a table scan is created, it can be refined to project columns and filter data.
virtual std::unique_ptr<TableScan> NewScan() const = 0;
/// Once a table scan builder is created, it can be refined to project columns and
/// filter data.
virtual std::unique_ptr<TableScanBuilder> NewScan() const = 0;

/// \brief Create a new append API to add files to this table and commit
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;

/// \brief Create a new transaction API to commit multiple table operations at once
virtual std::unique_ptr<Transaction> NewTransaction() = 0;

/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
/// IO-less design in the core library.
// /// \brief Returns a FileIO to read and write table data and metadata files
// virtual std::shared_ptr<FileIO> io() const = 0;
virtual std::shared_ptr<FileIO> io() const = 0;

/// \brief Returns a LocationProvider to provide locations for new data files
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;
Expand Down
116 changes: 116 additions & 0 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/table_scan.h"

#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/manifest_reader.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/util/macros.h"

namespace iceberg {

TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {}

TableScanBuilder& TableScanBuilder::WithColumnNames(
std::vector<std::string> column_names) {
column_names_ = std::move(column_names);
return *this;
}

TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
snapshot_id_ = snapshot_id;
return *this;
}

TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression> filter) {
filter_ = std::move(filter);
return *this;
}

Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
std::shared_ptr<Snapshot> snapshot;
if (snapshot_id_) {
ICEBERG_ASSIGN_OR_RAISE(snapshot, table_.snapshot(*snapshot_id_));
} else {
snapshot = table_.current_snapshot();
}
if (snapshot == nullptr) {
return InvalidArgument("No snapshot found for table {}", table_.name());
}

std::shared_ptr<Schema> schema;
if (snapshot->schema_id) {
const auto& schemas = table_.schemas();
if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) {
schema = it->second;
} else {
return InvalidArgument("Schema {} in snapshot {} is not found",
*snapshot->schema_id, snapshot->snapshot_id);
}
} else {
schema = table_.schema();
}

std::vector<SchemaField> projected_fields;
projected_fields.reserve(column_names_.size());
for (const auto& column_name : column_names_) {
auto field_opt = schema->GetFieldByName(column_name);
if (!field_opt) {
return InvalidArgument("Column {} not found in schema", column_name);
}
projected_fields.emplace_back(field_opt.value().get());
}

auto projected_schema =
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
TableScan::ScanContext context{.snapshot = std::move(snapshot),
.projected_schema = std::move(projected_schema),
.filter = std::move(filter_)};
return std::make_unique<TableScan>(std::move(context), table_.io());
}

TableScan::TableScan(ScanContext context, std::shared_ptr<FileIO> file_io)
: context_(std::move(context)), file_io_(std::move(file_io)) {}

Result<std::vector<std::shared_ptr<FileScanTask>>> TableScan::PlanFiles() const {
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader,
CreateManifestListReader(context_.snapshot->manifest_list));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());

std::vector<std::shared_ptr<FileScanTask>> tasks;
for (const auto& manifest_file : manifest_files) {
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
CreateManifestReader(manifest_file->manifest_path));
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());

for (const auto& manifest : manifests) {
const auto& data_file = manifest->data_file;
tasks.emplace_back(
std::make_shared<FileScanTask>(data_file, 0, data_file->file_size_in_bytes));
}
}
return tasks;
}

} // namespace iceberg
96 changes: 96 additions & 0 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <string>
#include <vector>

#include "iceberg/manifest_entry.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Builder class for creating TableScan instances.
class ICEBERG_EXPORT TableScanBuilder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an abstraction for common parts of other scan builders? We may need more scan types for incremental, append-only, changelog, etc. If there's not much to abstract, I'm fine to keep it as is for now and refactor them later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the Java implementation, the hierarchy of Scan and ScanTask has been defined. I will review this part and try to make it more abstract.

public:
/// \brief Constructs a TableScanBuilder for the given table.
/// \param table Reference to the table to scan.
explicit TableScanBuilder(const Table& table);

/// \brief Sets the snapshot ID to scan.
/// \param snapshot_id The ID of the snapshot.
/// \return Reference to the builder.
TableScanBuilder& WithSnapshotId(int64_t snapshot_id);

/// \brief Selects columns to include in the scan.
/// \param column_names A list of column names. If empty, all columns will be selected.
/// \return Reference to the builder.
TableScanBuilder& WithColumnNames(std::vector<std::string> column_names);

/// \brief Applies a filter expression to the scan.
/// \param filter Filter expression to use.
/// \return Reference to the builder.
TableScanBuilder& WithFilter(std::shared_ptr<Expression> filter);

/// \brief Builds and returns a TableScan instance.
/// \return A Result containing the TableScan or an error.
Result<std::unique_ptr<TableScan>> Build();

private:
const Table& table_;
std::vector<std::string> column_names_;
std::optional<int64_t> snapshot_id_;
std::shared_ptr<Expression> filter_;
};

/// \brief Represents a configured scan operation on a table.
class ICEBERG_EXPORT TableScan {
public:
/// \brief Scan context holding snapshot and scan-specific metadata.
struct ScanContext {
std::shared_ptr<Snapshot> snapshot; ///< Snapshot to scan.
std::shared_ptr<Schema> projected_schema; ///< Projected schema.
std::shared_ptr<Expression> filter; ///< Filter expression to apply.
};

/// \brief Constructs a TableScan with the given context and file I/O.
/// \param context Scan context including snapshot, schema, and filter.
/// \param file_io File I/O instance for reading manifests and data files.
TableScan(ScanContext context, std::shared_ptr<FileIO> file_io);

/// \brief Plans the scan tasks by resolving manifests and data files.
///
/// Returns a list of file scan tasks if successful.
/// \return A Result containing scan tasks or an error.
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const;

private:
ScanContext context_;
std::shared_ptr<FileIO> file_io_;
};

/// \brief Represents a task to scan a portion of a data file.
struct ICEBERG_EXPORT FileScanTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before defining FileScanTask, should we abstract it a little bit? I haven't looked at other implementations yet. From the java impl, there's a hierarchy of ScanTask and FileScanTask is a combination of ContentScanTask and PartitionScanTask.

I'm not sure if we want to take the complexity to support the java SplittableScanTask, MergableScanTask, and ScanTaskGroup. We can revisit these later.

std::shared_ptr<DataFile> data_file; ///< Data file metadata.
uint64_t start; ///< Start byte offset.
uint64_t length; ///< Length in bytes to scan.
};

} // namespace iceberg
11 changes: 10 additions & 1 deletion src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class LocationProvider;
class SortField;
class SortOrder;
class Table;
class FileIO;
class Transaction;
class Transform;
class TransformFunction;
Expand All @@ -109,6 +110,12 @@ class NameMapping;
enum class SnapshotRefType;
enum class TransformType;

class Expression;

struct FileScanTask;
class TableScan;
class TableScanBuilder;

/// ----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
/// ----------------------------------------------------------------------------
Expand All @@ -120,11 +127,13 @@ class MetadataUpdate;
class UpdateRequirement;

class AppendFiles;
class TableScan;

struct DataFile;
struct ManifestEntry;
struct ManifestFile;
struct ManifestList;

class ManifestReader;
class ManifestListReader;

} // namespace iceberg
Loading