-
Notifications
You must be signed in to change notification settings - Fork 37
feat: basic table scan planning #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e971cc4
5fc6971
6a2cb74
d71c26a
c6c1a1f
cd07a0c
b7becc2
abfdfcd
28043b1
fa25891
428651f
812a545
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
#include "iceberg/table_scan.h" | ||
|
||
#include "iceberg/manifest_entry.h" | ||
#include "iceberg/manifest_list.h" | ||
#include "iceberg/manifest_reader.h" | ||
#include "iceberg/schema.h" | ||
#include "iceberg/schema_field.h" | ||
#include "iceberg/snapshot.h" | ||
#include "iceberg/table.h" | ||
#include "iceberg/util/macros.h" | ||
|
||
namespace iceberg { | ||
|
||
TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {} | ||
|
||
TableScanBuilder& TableScanBuilder::WithColumnNames( | ||
std::vector<std::string> column_names) { | ||
column_names_ = std::move(column_names); | ||
return *this; | ||
} | ||
|
||
TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) { | ||
snapshot_id_ = snapshot_id; | ||
return *this; | ||
} | ||
|
||
TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression> filter) { | ||
filter_ = std::move(filter); | ||
return *this; | ||
} | ||
|
||
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() { | ||
std::shared_ptr<Snapshot> snapshot; | ||
if (snapshot_id_) { | ||
ICEBERG_ASSIGN_OR_RAISE(snapshot, table_.snapshot(*snapshot_id_)); | ||
} else { | ||
snapshot = table_.current_snapshot(); | ||
} | ||
if (snapshot == nullptr) { | ||
return InvalidArgument("No snapshot found for table {}", table_.name()); | ||
} | ||
|
||
std::shared_ptr<Schema> schema; | ||
if (snapshot->schema_id) { | ||
const auto& schemas = table_.schemas(); | ||
if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { | ||
schema = it->second; | ||
} else { | ||
return InvalidArgument("Schema {} in snapshot {} is not found", | ||
*snapshot->schema_id, snapshot->snapshot_id); | ||
} | ||
} else { | ||
schema = table_.schema(); | ||
} | ||
|
||
std::vector<SchemaField> projected_fields; | ||
projected_fields.reserve(column_names_.size()); | ||
for (const auto& column_name : column_names_) { | ||
auto field_opt = schema->GetFieldByName(column_name); | ||
if (!field_opt) { | ||
return InvalidArgument("Column {} not found in schema", column_name); | ||
} | ||
projected_fields.emplace_back(field_opt.value().get()); | ||
} | ||
|
||
auto projected_schema = | ||
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id()); | ||
TableScan::ScanContext context{.snapshot = std::move(snapshot), | ||
.projected_schema = std::move(projected_schema), | ||
.filter = std::move(filter_)}; | ||
return std::make_unique<TableScan>(std::move(context), table_.io()); | ||
} | ||
|
||
TableScan::TableScan(ScanContext context, std::shared_ptr<FileIO> file_io) | ||
: context_(std::move(context)), file_io_(std::move(file_io)) {} | ||
|
||
Result<std::vector<std::shared_ptr<FileScanTask>>> TableScan::PlanFiles() const { | ||
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, | ||
CreateManifestListReader(context_.snapshot->manifest_list)); | ||
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); | ||
|
||
std::vector<std::shared_ptr<FileScanTask>> tasks; | ||
for (const auto& manifest_file : manifest_files) { | ||
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, | ||
CreateManifestReader(manifest_file->manifest_path)); | ||
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); | ||
|
||
for (const auto& manifest : manifests) { | ||
const auto& data_file = manifest->data_file; | ||
tasks.emplace_back( | ||
std::make_shared<FileScanTask>(data_file, 0, data_file->file_size_in_bytes)); | ||
} | ||
} | ||
return tasks; | ||
} | ||
|
||
} // namespace iceberg |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
#pragma once | ||
|
||
#include <string> | ||
#include <vector> | ||
|
||
#include "iceberg/manifest_entry.h" | ||
gty404 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#include "iceberg/type_fwd.h" | ||
|
||
namespace iceberg { | ||
|
||
/// \brief Builder class for creating TableScan instances. | ||
class ICEBERG_EXPORT TableScanBuilder { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need an abstraction for common parts of other scan builders? We may need more scan types for incremental, append-only, changelog, etc. If there's not much to abstract, I'm fine to keep it as is for now and refactor them later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, in the Java implementation, the hierarchy of Scan and ScanTask has been defined. I will review this part and try to make it more abstract. |
||
public: | ||
/// \brief Constructs a TableScanBuilder for the given table. | ||
/// \param table Reference to the table to scan. | ||
explicit TableScanBuilder(const Table& table); | ||
|
||
/// \brief Sets the snapshot ID to scan. | ||
/// \param snapshot_id The ID of the snapshot. | ||
/// \return Reference to the builder. | ||
TableScanBuilder& WithSnapshotId(int64_t snapshot_id); | ||
|
||
/// \brief Selects columns to include in the scan. | ||
gty404 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// \param column_names A list of column names. If empty, all columns will be selected. | ||
/// \return Reference to the builder. | ||
TableScanBuilder& WithColumnNames(std::vector<std::string> column_names); | ||
|
||
/// \brief Applies a filter expression to the scan. | ||
/// \param filter Filter expression to use. | ||
/// \return Reference to the builder. | ||
TableScanBuilder& WithFilter(std::shared_ptr<Expression> filter); | ||
|
||
/// \brief Builds and returns a TableScan instance. | ||
/// \return A Result containing the TableScan or an error. | ||
Result<std::unique_ptr<TableScan>> Build(); | ||
|
||
private: | ||
const Table& table_; | ||
std::vector<std::string> column_names_; | ||
std::optional<int64_t> snapshot_id_; | ||
std::shared_ptr<Expression> filter_; | ||
}; | ||
|
||
/// \brief Represents a configured scan operation on a table. | ||
class ICEBERG_EXPORT TableScan { | ||
public: | ||
/// \brief Scan context holding snapshot and scan-specific metadata. | ||
struct ScanContext { | ||
std::shared_ptr<Snapshot> snapshot; ///< Snapshot to scan. | ||
std::shared_ptr<Schema> projected_schema; ///< Projected schema. | ||
std::shared_ptr<Expression> filter; ///< Filter expression to apply. | ||
}; | ||
|
||
/// \brief Constructs a TableScan with the given context and file I/O. | ||
/// \param context Scan context including snapshot, schema, and filter. | ||
/// \param file_io File I/O instance for reading manifests and data files. | ||
TableScan(ScanContext context, std::shared_ptr<FileIO> file_io); | ||
|
||
/// \brief Plans the scan tasks by resolving manifests and data files. | ||
/// | ||
/// Returns a list of file scan tasks if successful. | ||
/// \return A Result containing scan tasks or an error. | ||
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const; | ||
|
||
private: | ||
ScanContext context_; | ||
std::shared_ptr<FileIO> file_io_; | ||
}; | ||
|
||
/// \brief Represents a task to scan a portion of a data file. | ||
struct ICEBERG_EXPORT FileScanTask { | ||
gty404 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before defining I'm not sure if we want to take the complexity to support the java |
||
std::shared_ptr<DataFile> data_file; ///< Data file metadata. | ||
uint64_t start; ///< Start byte offset. | ||
uint64_t length; ///< Length in bytes to scan. | ||
}; | ||
|
||
} // namespace iceberg |
Uh oh!
There was an error while loading. Please reload this page.