diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ed3a693e..b2639465 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -31,6 +31,8 @@ set(ICEBERG_SOURCES name_mapping.cc partition_field.cc partition_spec.cc + row/arrow_array_wrapper.cc + row/manifest_wrapper.cc schema.cc schema_field.cc schema_internal.cc @@ -105,6 +107,7 @@ iceberg_install_all_headers(iceberg) add_subdirectory(catalog) add_subdirectory(expression) +add_subdirectory(row) add_subdirectory(util) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 126bd755..feff82f1 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -27,6 +27,7 @@ #include "iceberg/manifest_list.h" #include "iceberg/schema.h" #include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -37,7 +38,7 @@ namespace iceberg { } #define PARSE_PRIMITIVE_FIELD(item, array_view, type) \ - for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \ item = static_cast(value); \ @@ -48,7 +49,7 @@ namespace iceberg { } #define PARSE_STRING_FIELD(item, array_view) \ - for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ if (!ArrowArrayViewIsNull(array_view, row_idx)) { \ auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \ item = std::string(value.data, value.size_bytes); \ @@ -59,7 +60,7 @@ namespace iceberg { } #define PARSE_BINARY_FIELD(item, array_view) \ - for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ + for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \ if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \ } else if (required) { \ @@ -225,66 +226,67 @@ Result> ParseManifestList(ArrowSchema* schema, auto field_name = field.value()->get().name(); bool required = !field.value()->get().optional(); auto view_of_column = array_view.children[idx]; - switch (idx) { - case 0: + ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field, ManifestFileFieldFromIndex(idx)); + switch (manifest_file_field) { + case ManifestFileField::kManifestPath: PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, view_of_column); break; - case 1: + case ManifestFileField::kManifestLength: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, view_of_column, int64_t); break; - case 2: + case ManifestFileField::kPartitionSpecId: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, view_of_column, int32_t); break; - case 3: + case ManifestFileField::kContent: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column, ManifestFile::Content); break; - case 4: + case ManifestFileField::kSequenceNumber: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column, int64_t); break; - case 5: + case ManifestFileField::kMinSequenceNumber: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, view_of_column, int64_t); break; - case 6: + case ManifestFileField::kAddedSnapshotId: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, view_of_column, int64_t); break; - case 7: + case ManifestFileField::kAddedFilesCount: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, view_of_column, int32_t); break; - case 8: + case ManifestFileField::kExistingFilesCount: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count, view_of_column, int32_t); break; - case 9: + case ManifestFileField::kDeletedFilesCount: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, view_of_column, int32_t); break; - case 10: + case ManifestFileField::kAddedRowsCount: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, view_of_column, int64_t); break; - case 11: + case ManifestFileField::kExistingRowsCount: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, view_of_column, int64_t); break; - case 12: + case ManifestFileField::kDeletedRowsCount: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, view_of_column, int64_t); break; - case 13: + case ManifestFileField::kPartitionFieldSummary: ICEBERG_RETURN_UNEXPECTED( ParsePartitionFieldSummaryList(view_of_column, manifest_files)); break; - case 14: + case ManifestFileField::kKeyMetadata: PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, view_of_column); break; - case 15: + case ManifestFileField::kFirstRowId: PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, view_of_column, int64_t); break; @@ -295,7 +297,7 @@ Result> ParseManifestList(ArrowSchema* schema, return manifest_files; } -Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx, +Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx, std::vector& manifest_entries) { if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) { auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx); @@ -355,7 +357,7 @@ Status ParseDataFile(const std::shared_ptr& data_file_schema, view_of_file_field); break; case 2: - for (size_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) { + for (int64_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) { if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) { auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, row_idx); std::string_view path_str(value.data, value.size_bytes); @@ -510,7 +512,7 @@ Result> ParseManifestEntry(ArrowSchema* schema, break; case 4: { auto data_file_schema = - dynamic_pointer_cast(field.value()->get().type()); + internal::checked_pointer_cast(field.value()->get().type()); ICEBERG_RETURN_UNEXPECTED( ParseDataFile(data_file_schema, view_of_column, manifest_entries)); break; @@ -571,4 +573,11 @@ Result> ManifestListReaderImpl::Files() const { return manifest_files; } +Result ManifestFileFieldFromIndex(int32_t index) { + if (index >= 0 && index < static_cast(ManifestFileField::kNextUnusedId)) { + return static_cast(index); + } + return InvalidArgument("Invalid manifest file field index: {}", index); +} + } // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h index 3144d078..13e3d2a8 100644 --- a/src/iceberg/manifest_reader_internal.h +++ b/src/iceberg/manifest_reader_internal.h @@ -60,4 +60,28 @@ class ManifestListReaderImpl : public ManifestListReader { std::unique_ptr reader_; }; +enum class ManifestFileField : int32_t { + kManifestPath = 0, + kManifestLength = 1, + kPartitionSpecId = 2, + kContent = 3, + kSequenceNumber = 4, + kMinSequenceNumber = 5, + kAddedSnapshotId = 6, + kAddedFilesCount = 7, + kExistingFilesCount = 8, + kDeletedFilesCount = 9, + kAddedRowsCount = 10, + kExistingRowsCount = 11, + kDeletedRowsCount = 12, + kPartitionFieldSummary = 13, + kKeyMetadata = 14, + kFirstRowId = 15, + // kNextUnusedId is the placeholder for the next unused index. + // Always keep this as the last index when adding new fields. + kNextUnusedId = 16, +}; + +Result ManifestFileFieldFromIndex(int32_t index); + } // namespace iceberg diff --git a/src/iceberg/row/CMakeLists.txt b/src/iceberg/row/CMakeLists.txt new file mode 100644 index 00000000..8deb6d23 --- /dev/null +++ b/src/iceberg/row/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/row) diff --git a/src/iceberg/row/arrow_array_wrapper.cc b/src/iceberg/row/arrow_array_wrapper.cc new file mode 100644 index 00000000..e97293bc --- /dev/null +++ b/src/iceberg/row/arrow_array_wrapper.cc @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/row/arrow_array_wrapper.h" + +#include + +#include + +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +#define NANOARROW_RETURN_IF_NOT_OK(status) \ + if (status != NANOARROW_OK) [[unlikely]] { \ + return InvalidArrowData("Nanoarrow error: {}", error.message); \ + } + +namespace { + +// TODO(gangwu): Reuse created ArrowArrayStructLike and others with cache. +Result ExtractValue(const ArrowSchema* schema, const ArrowArray* array, + const ArrowArrayView* array_view, int64_t index) { + if (ArrowArrayViewIsNull(array_view, index)) { + return std::monostate{}; + } + + switch (array_view->storage_type) { + case NANOARROW_TYPE_BOOL: + return static_cast(ArrowArrayViewGetIntUnsafe(array_view, index)); + case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_DATE32: + return static_cast(ArrowArrayViewGetIntUnsafe(array_view, index)); + case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_TIME64: + case NANOARROW_TYPE_TIMESTAMP: + return ArrowArrayViewGetIntUnsafe(array_view, index); + case NANOARROW_TYPE_FLOAT: + return static_cast(ArrowArrayViewGetDoubleUnsafe(array_view, index)); + case NANOARROW_TYPE_DOUBLE: + return ArrowArrayViewGetDoubleUnsafe(array_view, index); + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + case NANOARROW_TYPE_STRING_VIEW: + case NANOARROW_TYPE_BINARY_VIEW: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_LARGE_BINARY: { + ArrowStringView value = ArrowArrayViewGetStringUnsafe(array_view, index); + return std::string_view(value.data, value.size_bytes); + } + case NANOARROW_TYPE_DECIMAL128: { + ArrowError error; + ArrowSchemaView schema_view; + NANOARROW_RETURN_IF_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, &error)); + ArrowDecimal value; + ArrowDecimalInit(&value, schema_view.decimal_bitwidth, + schema_view.decimal_precision, schema_view.decimal_scale); + ArrowArrayViewGetDecimalUnsafe(array_view, index, &value); + if (value.n_words != 2) { + return InvalidArrowData("Unsupported Arrow decimal words: {}", value.n_words); + } + int128_t int_value{0}; + std::memcpy(&int_value, value.words, sizeof(int128_t)); + return Decimal(int_value); + } + case NANOARROW_TYPE_STRUCT: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr struct_like, + ArrowArrayStructLike::Make(*schema, *array, index)); + return struct_like; + } + case NANOARROW_TYPE_LIST: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr array_like, + ArrowArrayArrayLike::Make(*schema, *array, index)); + return array_like; + } + case NANOARROW_TYPE_MAP: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr map_like, + ArrowArrayMapLike::Make(*schema, *array, index)); + return map_like; + } + case NANOARROW_TYPE_EXTENSION: + // TODO(gangwu): Handle these types properly + default: + return NotImplemented("Unsupported Arrow type: {}", + static_cast(array_view->storage_type)); + } +} + +} // namespace + +// ArrowArrayStructLike Implementation + +class ArrowArrayStructLike::Impl { + public: + Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) + : schema_(schema), array_(std::cref(array)), row_index_(row_index) {} + + ~Impl() = default; + + Result GetField(size_t pos) const { + // NOLINTNEXTLINE(modernize-use-integer-sign-comparison) + if (pos >= static_cast(schema_.n_children)) { + return InvalidArgument("Field index {} out of range (size: {})", pos, + schema_.n_children); + } + + if (row_index_ < 0 || row_index_ >= array_.get().length) { + return InvalidArgument("Row index {} out of range (length: {})", row_index_, + array_.get().length); + } + + const ArrowSchema* child_schema = schema_.children[pos]; + const ArrowArray* child_array = array_.get().children[pos]; + const ArrowArrayView* child_view = array_view_.children[pos]; + + return ExtractValue(child_schema, child_array, child_view, row_index_); + } + + size_t num_fields() const { return static_cast(schema_.n_children); } + + Status Reset(const ArrowArray& array, int64_t row_index) { + array_ = std::cref(array); + row_index_ = row_index; + + ArrowError error; + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewSetArray(&array_view_, &array_.get(), &error)); + return {}; + } + + Status Reset(int64_t row_index) { + row_index_ = row_index; + return {}; + } + + Status Init() { + ArrowError error; + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error)); + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewSetArray(&array_view_, &array_.get(), &error)); + return {}; + } + + private: + ArrowArrayView array_view_; + internal::ArrowArrayViewGuard array_view_guard_{&array_view_}; + + const ArrowSchema& schema_; + std::reference_wrapper array_; + int64_t row_index_; +}; + +Result> ArrowArrayStructLike::Make( + const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) { + auto impl = std::make_unique(schema, array, row_index); + ICEBERG_RETURN_UNEXPECTED(impl->Init()); + return std::unique_ptr(new ArrowArrayStructLike(std::move(impl))); +} + +ArrowArrayStructLike::ArrowArrayStructLike(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +ArrowArrayStructLike::~ArrowArrayStructLike() = default; + +Result ArrowArrayStructLike::GetField(size_t pos) const { + return impl_->GetField(pos); +} + +size_t ArrowArrayStructLike::num_fields() const { return impl_->num_fields(); } + +Status ArrowArrayStructLike::Reset(int64_t row_index) { return impl_->Reset(row_index); } + +Status ArrowArrayStructLike::Reset(const ArrowArray& array, int64_t row_index) { + return impl_->Reset(array, row_index); +} + +// ArrowArrayArrayLike Implementation + +class ArrowArrayArrayLike::Impl { + public: + Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) + : schema_(schema), array_(std::cref(array)), row_index_(row_index) {} + + ~Impl() = default; + + Result GetElement(size_t pos) const { + // NOLINTNEXTLINE(modernize-use-integer-sign-comparison) + if (pos >= static_cast(length_)) { + return InvalidArgument("Element index {} out of range (length: {})", pos, length_); + } + + const ArrowSchema* child_schema = schema_.children[0]; + const ArrowArray* child_array = array_.get().children[0]; + const ArrowArrayView* child_view = array_view_.children[0]; + + return ExtractValue(child_schema, child_array, child_view, + offset_ + static_cast(pos)); + } + + size_t size() const { return static_cast(length_); } + + Status Reset(const ArrowArray& array, int64_t row_index) { + array_ = std::cref(array); + row_index_ = row_index; + + ArrowError error; + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewSetArray(&array_view_, &array_.get(), &error)); + return UpdateOffsets(); + } + + Status Reset(int64_t row_index) { + row_index_ = row_index; + return UpdateOffsets(); + } + + Status Init() { + ArrowError error; + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error)); + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewSetArray(&array_view_, &array_.get(), &error)); + return UpdateOffsets(); + } + + private: + Status UpdateOffsets() { + if (row_index_ < 0 || row_index_ >= array_.get().length) { + return InvalidArgument("Row index {} out of range (length: {})", row_index_, + array_.get().length); + } + + offset_ = ArrowArrayViewListChildOffset(&array_view_, row_index_); + length_ = ArrowArrayViewListChildOffset(&array_view_, row_index_ + 1) - offset_; + return {}; + } + + ArrowArrayView array_view_; + internal::ArrowArrayViewGuard array_view_guard_{&array_view_}; + + const ArrowSchema& schema_; + std::reference_wrapper array_; + int64_t row_index_; + + int64_t offset_ = 0; + int64_t length_ = 0; +}; + +Result> ArrowArrayArrayLike::Make( + const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) { + auto impl = std::make_unique(schema, array, row_index); + ICEBERG_RETURN_UNEXPECTED(impl->Init()); + return std::unique_ptr(new ArrowArrayArrayLike(std::move(impl))); +} + +ArrowArrayArrayLike::ArrowArrayArrayLike(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +ArrowArrayArrayLike::~ArrowArrayArrayLike() = default; + +Result ArrowArrayArrayLike::GetElement(size_t pos) const { + return impl_->GetElement(pos); +} + +size_t ArrowArrayArrayLike::size() const { return impl_->size(); } + +Status ArrowArrayArrayLike::Reset(int64_t row_index) { return impl_->Reset(row_index); } + +Status ArrowArrayArrayLike::Reset(const ArrowArray& array, int64_t row_index) { + return impl_->Reset(array, row_index); +} + +// ArrowArrayMapLike Implementation + +class ArrowArrayMapLike::Impl { + public: + Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) + : schema_(schema), array_(std::cref(array)), row_index_(row_index) {} + + ~Impl() = default; + + Result GetKey(size_t pos) const { + // NOLINTNEXTLINE(modernize-use-integer-sign-comparison) + if (pos >= static_cast(length_)) { + return InvalidArgument("Key index {} out of range (length: {})", pos, length_); + } + + const ArrowSchema* keys_schema = schema_.children[0]->children[0]; + const ArrowArray* keys_array = array_.get().children[0]->children[0]; + const ArrowArrayView* keys_view = array_view_.children[0]->children[0]; + + return ExtractValue(keys_schema, keys_array, keys_view, + offset_ + static_cast(pos)); + } + + Result GetValue(size_t pos) const { + // NOLINTNEXTLINE(modernize-use-integer-sign-comparison) + if (pos >= static_cast(length_)) { + return InvalidArgument("Value index {} out of range (length: {})", pos, length_); + } + + const ArrowSchema* values_schema = schema_.children[0]->children[1]; + const ArrowArray* values_array = array_.get().children[0]->children[1]; + const ArrowArrayView* values_view = array_view_.children[0]->children[1]; + + return ExtractValue(values_schema, values_array, values_view, + offset_ + static_cast(pos)); + } + + size_t size() const { return static_cast(length_); } + + Status Reset(const ArrowArray& array, int64_t row_index) { + array_ = std::cref(array); + row_index_ = row_index; + + ArrowError error; + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewSetArray(&array_view_, &array_.get(), &error)); + return UpdateOffsets(); + } + + Status Reset(int64_t row_index) { + row_index_ = row_index; + return UpdateOffsets(); + } + + Status Init() { + ArrowError error; + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error)); + NANOARROW_RETURN_IF_NOT_OK( + ArrowArrayViewSetArray(&array_view_, &array_.get(), &error)); + return UpdateOffsets(); + } + + private: + Status UpdateOffsets() { + if (row_index_ < 0 || row_index_ >= array_.get().length) { + return InvalidArgument("Row index {} out of range (length: {})", row_index_, + array_.get().length); + } + + // XXX: ArrowArrayViewListChildOffset does not work for map types. + // We need to directly access the offsets buffer instead. + auto* offsets_buffer = array_view_.buffer_views[1].data.as_int32; + offset_ = offsets_buffer[row_index_]; + length_ = offsets_buffer[row_index_ + 1] - offset_; + + return {}; + } + + ArrowArrayView array_view_; + internal::ArrowArrayViewGuard array_view_guard_{&array_view_}; + + const ArrowSchema& schema_; + std::reference_wrapper array_; + int64_t row_index_; + + int64_t offset_ = 0; + int64_t length_ = 0; +}; + +Result> ArrowArrayMapLike::Make( + const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) { + auto impl = std::make_unique(schema, array, row_index); + ICEBERG_RETURN_UNEXPECTED(impl->Init()); + return std::unique_ptr(new ArrowArrayMapLike(std::move(impl))); +} + +ArrowArrayMapLike::ArrowArrayMapLike(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +ArrowArrayMapLike::~ArrowArrayMapLike() = default; + +Result ArrowArrayMapLike::GetKey(size_t pos) const { return impl_->GetKey(pos); } + +Result ArrowArrayMapLike::GetValue(size_t pos) const { + return impl_->GetValue(pos); +} + +size_t ArrowArrayMapLike::size() const { return impl_->size(); } + +Status ArrowArrayMapLike::Reset(int64_t row_index) { return impl_->Reset(row_index); } + +Status ArrowArrayMapLike::Reset(const ArrowArray& array, int64_t row_index) { + return impl_->Reset(array, row_index); +} + +} // namespace iceberg diff --git a/src/iceberg/row/arrow_array_wrapper.h b/src/iceberg/row/arrow_array_wrapper.h new file mode 100644 index 00000000..7316badd --- /dev/null +++ b/src/iceberg/row/arrow_array_wrapper.h @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/row/arrow_array_wrapper.h +/// Wrapper classes for ArrowArray that implement StructLike, ArrayLike, and MapLike +/// interfaces for unified row-oriented data access from columnar ArrowArray data. + +#include "iceberg/arrow_c_data.h" +#include "iceberg/row/struct_like.h" + +namespace iceberg { + +/// \brief Wrapper for one row of a struct-typed ArrowArray. +class ICEBERG_EXPORT ArrowArrayStructLike : public StructLike { + public: + ~ArrowArrayStructLike() override; + + Result GetField(size_t pos) const override; + + size_t num_fields() const override; + + Status Reset(int64_t row_index); + + Status Reset(const ArrowArray& array, int64_t row_index = 0); + + static Result> Make(const ArrowSchema& schema, + const ArrowArray& array, + int64_t row_index = 0); + + ArrowArrayStructLike(const ArrowArrayStructLike&) = delete; + ArrowArrayStructLike& operator=(const ArrowArrayStructLike&) = delete; + + private: + class Impl; + explicit ArrowArrayStructLike(std::unique_ptr impl); + + std::unique_ptr impl_; +}; + +/// \brief Wrapper for one row of a list-typed ArrowArray. +class ICEBERG_EXPORT ArrowArrayArrayLike : public ArrayLike { + public: + ~ArrowArrayArrayLike() override; + + Result GetElement(size_t pos) const override; + + size_t size() const override; + + Status Reset(int64_t row_index); + + Status Reset(const ArrowArray& array, int64_t row_index = 0); + + static Result> Make(const ArrowSchema& schema, + const ArrowArray& array, + int64_t row_index = 0); + + ArrowArrayArrayLike(const ArrowArrayArrayLike& other) = delete; + ArrowArrayArrayLike& operator=(const ArrowArrayArrayLike& other) = delete; + + private: + class Impl; + explicit ArrowArrayArrayLike(std::unique_ptr impl); + + std::unique_ptr impl_; +}; + +/// \brief Wrapper for one row of a map-typed ArrowArray. +class ICEBERG_EXPORT ArrowArrayMapLike : public MapLike { + public: + ~ArrowArrayMapLike() override; + + Result GetKey(size_t pos) const override; + + Result GetValue(size_t pos) const override; + + size_t size() const override; + + Status Reset(int64_t row_index); + + Status Reset(const ArrowArray& array, int64_t row_index = 0); + + static Result> Make(const ArrowSchema& schema, + const ArrowArray& array, + int64_t row_index = 0); + + ArrowArrayMapLike(const ArrowArrayMapLike& other) = delete; + ArrowArrayMapLike& operator=(const ArrowArrayMapLike& other) = delete; + + private: + class Impl; + explicit ArrowArrayMapLike(std::unique_ptr impl); + + std::unique_ptr impl_; +}; + +} // namespace iceberg diff --git a/src/iceberg/row/manifest_wrapper.cc b/src/iceberg/row/manifest_wrapper.cc new file mode 100644 index 00000000..4c8708e4 --- /dev/null +++ b/src/iceberg/row/manifest_wrapper.cc @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/row/manifest_wrapper.h" + +#include "iceberg/manifest_reader_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { +template + requires std::is_same_v> || std::is_same_v +std::string_view ToView(const T& value) { + return {reinterpret_cast(value.data()), value.size()}; // NOLINT +} + +template +Result FromOptional(const std::optional& value) { + if (value.has_value()) { + return value.value(); + } + return std::monostate{}; +} + +} // namespace + +Result PartitionFieldSummaryStructLike::GetField(size_t pos) const { + if (pos >= num_fields()) { + return InvalidArgument("Invalid partition field summary index: {}", pos); + } + switch (pos) { + case 0: + return summary_.get().contains_null; + case 1: + return FromOptional(summary_.get().contains_nan); + case 2: + return FromOptional( + summary_.get().lower_bound.transform(ToView>)); + case 3: + return FromOptional( + summary_.get().upper_bound.transform(ToView>)); + default: + return InvalidArgument("Invalid partition field summary index: {}", pos); + } +} + +Result PartitionFieldSummaryArrayLike::GetElement(size_t pos) const { + if (pos >= size()) { + return InvalidArgument("Invalid partition field summary index: {}", pos); + } + if (summary_ == nullptr) { + summary_ = std::make_shared(summaries_.get()[pos]); + } else { + summary_->Reset(summaries_.get()[pos]); + } + return summary_; +} + +Result ManifestFileStructLike::GetField(size_t pos) const { + if (pos >= num_fields()) { + return InvalidArgument("Invalid manifest file field index: {}", pos); + } + ICEBERG_ASSIGN_OR_RAISE(auto field, + ManifestFileFieldFromIndex(static_cast(pos))); + const auto& manifest_file = manifest_file_.get(); + switch (field) { + case ManifestFileField::kManifestPath: + return ToView(manifest_file.manifest_path); + case ManifestFileField::kManifestLength: + return manifest_file.manifest_length; + case ManifestFileField::kPartitionSpecId: + return manifest_file.partition_spec_id; + case ManifestFileField::kContent: + return static_cast(manifest_file.content); + case ManifestFileField::kSequenceNumber: + return manifest_file.sequence_number; + case ManifestFileField::kMinSequenceNumber: + return manifest_file.min_sequence_number; + case ManifestFileField::kAddedSnapshotId: + return manifest_file.added_snapshot_id; + case ManifestFileField::kAddedFilesCount: + return FromOptional(manifest_file.added_files_count); + case ManifestFileField::kExistingFilesCount: + return FromOptional(manifest_file.existing_files_count); + case ManifestFileField::kDeletedFilesCount: + return FromOptional(manifest_file.deleted_files_count); + case ManifestFileField::kAddedRowsCount: + return FromOptional(manifest_file.added_rows_count); + case ManifestFileField::kExistingRowsCount: + return FromOptional(manifest_file.existing_rows_count); + case ManifestFileField::kDeletedRowsCount: + return FromOptional(manifest_file.deleted_rows_count); + case ManifestFileField::kPartitionFieldSummary: { + if (summaries_ == nullptr) { + summaries_ = + std::make_shared(manifest_file.partitions); + } else { + summaries_->Reset(manifest_file.partitions); + } + return summaries_; + } + case ManifestFileField::kKeyMetadata: + return ToView(manifest_file.key_metadata); + case ManifestFileField::kFirstRowId: + return FromOptional(manifest_file.first_row_id); + case ManifestFileField::kNextUnusedId: + return InvalidArgument("Invalid manifest file field index: {}", pos); + } + return InvalidArgument("Invalid manifest file field index: {}", pos); +} + +size_t ManifestFileStructLike::num_fields() const { + return static_cast(ManifestFileField::kNextUnusedId); +} + +std::unique_ptr FromManifestFile(const ManifestFile& file) { + return std::make_unique(file); +} + +} // namespace iceberg diff --git a/src/iceberg/row/manifest_wrapper.h b/src/iceberg/row/manifest_wrapper.h new file mode 100644 index 00000000..3271eac4 --- /dev/null +++ b/src/iceberg/row/manifest_wrapper.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest_wrapper.h +/// Wrapper classes for manifest-related data structures that implement +/// StructLike, ArrayLike, and MapLike interfaces for unified data access. + +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest_list.h" +#include "iceberg/row/struct_like.h" + +namespace iceberg { + +/// \brief StructLike wrapper for PartitionFieldSummary. +class ICEBERG_EXPORT PartitionFieldSummaryStructLike : public StructLike { + public: + explicit PartitionFieldSummaryStructLike(const PartitionFieldSummary& summary) + : summary_(summary) {} + ~PartitionFieldSummaryStructLike() override = default; + + PartitionFieldSummaryStructLike(const PartitionFieldSummaryStructLike&) = delete; + PartitionFieldSummaryStructLike& operator=(const PartitionFieldSummaryStructLike&) = + delete; + + Result GetField(size_t pos) const override; + + size_t num_fields() const override { return 4; } + + void Reset(const PartitionFieldSummary& summary) { summary_ = std::cref(summary); } + + private: + std::reference_wrapper summary_; +}; + +/// \brief ArrayLike wrapper for a vector of PartitionFieldSummary. +class ICEBERG_EXPORT PartitionFieldSummaryArrayLike : public ArrayLike { + public: + explicit PartitionFieldSummaryArrayLike( + const std::vector& summaries) + : summaries_(summaries) {} + ~PartitionFieldSummaryArrayLike() override = default; + + PartitionFieldSummaryArrayLike(const PartitionFieldSummaryArrayLike&) = delete; + PartitionFieldSummaryArrayLike& operator=(const PartitionFieldSummaryArrayLike&) = + delete; + + Result GetElement(size_t pos) const override; + + size_t size() const override { return summaries_.get().size(); } + + void Reset(const std::vector& summaries) { + summaries_ = std::cref(summaries); + } + + private: + std::reference_wrapper> summaries_; + mutable std::shared_ptr summary_; +}; + +/// \brief StructLike wrapper for ManifestFile. +class ICEBERG_EXPORT ManifestFileStructLike : public StructLike { + public: + explicit ManifestFileStructLike(const ManifestFile& file) : manifest_file_(file) {} + ~ManifestFileStructLike() override = default; + + ManifestFileStructLike(const ManifestFileStructLike&) = delete; + ManifestFileStructLike& operator=(const ManifestFileStructLike&) = delete; + + Result GetField(size_t pos) const override; + + size_t num_fields() const override; + + void Reset(const ManifestFile& file) { manifest_file_ = std::cref(file); } + + private: + std::reference_wrapper manifest_file_; + mutable std::shared_ptr summaries_; +}; + +} // namespace iceberg diff --git a/src/iceberg/row/struct_like.h b/src/iceberg/row/struct_like.h new file mode 100644 index 00000000..3093f752 --- /dev/null +++ b/src/iceberg/row/struct_like.h @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/row/struct_like.h +/// Structures for viewing data in a row-based format. This header contains the +/// definition of StructLike, ArrayLike, and MapLike which provide an unified +/// interface for viewing data from ArrowArray or structs like ManifestFile and +/// ManifestEntry. Note that they do not carry type information and should be +/// used in conjunction with the schema to get the type information. + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/decimal.h" + +namespace iceberg { + +/// \brief A scalar value depending on its data type. +/// +/// Note that all string and binary values are stored as non-owned string_view +/// and their lifetime is managed by the wrapped object. +using Scalar = std::variant, // for struct + std::shared_ptr, // for list + std::shared_ptr>; // for map + +/// \brief An immutable struct-like wrapper. +class ICEBERG_EXPORT StructLike { + public: + virtual ~StructLike() = default; + + /// \brief Get the field value at the given position. + /// \param pos The position of the field in the struct. + virtual Result GetField(size_t pos) const = 0; + + /// \brief Get the number of fields in the struct. + virtual size_t num_fields() const = 0; +}; + +/// \brief An immutable array-like wrapper. +class ICEBERG_EXPORT ArrayLike { + public: + virtual ~ArrayLike() = default; + + /// \brief Get the array element at the given position. + /// \param pos The position of the element in the array. + virtual Result GetElement(size_t pos) const = 0; + + /// \brief Get the number of elements in the array. + virtual size_t size() const = 0; +}; + +/// \brief An immutable map-like wrapper. +class ICEBERG_EXPORT MapLike { + public: + virtual ~MapLike() = default; + + /// \brief Get the key at the given position. + /// \param pos The position of the key in the map. + virtual Result GetKey(size_t pos) const = 0; + + /// \brief Get the value at the given position. + /// \param pos The position of the value in the map. + virtual Result GetValue(size_t pos) const = 0; + + /// \brief Get the number of entries in the map. + virtual size_t size() const = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index cb3b6082..db9caab3 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -110,10 +110,11 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(arrow_test USE_BUNDLE SOURCES - arrow_test.cc arrow_fs_file_io_test.cc + arrow_test.cc + gzip_decompress_test.cc metadata_io_test.cc - gzip_decompress_test.cc) + struct_like_test.cc) add_iceberg_test(catalog_test USE_BUNDLE diff --git a/src/iceberg/test/struct_like_test.cc b/src/iceberg/test/struct_like_test.cc new file mode 100644 index 00000000..484b2543 --- /dev/null +++ b/src/iceberg/test/struct_like_test.cc @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader_internal.h" +#include "iceberg/row/arrow_array_wrapper.h" +#include "iceberg/row/manifest_wrapper.h" +#include "iceberg/schema_internal.h" +#include "matchers.h" + +namespace iceberg { + +#define EXPECT_SCALAR_EQ(result, expected_type, expected_value) \ + do { \ + ASSERT_THAT(result, IsOk()); \ + auto scalar = result.value(); \ + ASSERT_TRUE(std::holds_alternative(scalar)); \ + EXPECT_EQ(std::get(scalar), expected_value); \ + } while (0) + +#define EXPECT_DECIMAL_EQ(result, scale, expected_value) \ + do { \ + ASSERT_THAT(result, IsOk()); \ + auto scalar = result.value(); \ + ASSERT_TRUE(std::holds_alternative(scalar)); \ + auto decimal = std::get(scalar); \ + EXPECT_EQ(decimal.ToString(scale), expected_value); \ + } while (0) + +#define EXPECT_SCALAR_NULL(result) \ + do { \ + ASSERT_THAT(result, IsOk()); \ + auto scalar = result.value(); \ + ASSERT_TRUE(std::holds_alternative(scalar)); \ + } while (0) + +TEST(ManifestFileStructLike, BasicFields) { + ManifestFile manifest_file{ + .manifest_path = "/path/to/manifest.avro", + .manifest_length = 12345, + .partition_spec_id = 1, + .content = ManifestFile::Content::kData, + .sequence_number = 100, + .min_sequence_number = 90, + .added_snapshot_id = 1001, + .added_files_count = 10, + .existing_files_count = 5, + .deleted_files_count = 2, + .added_rows_count = 1000, + .existing_rows_count = 500, + .deleted_rows_count = 20, + }; + + ManifestFileStructLike struct_like(manifest_file); + EXPECT_EQ(struct_like.num_fields(), 16); + + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kManifestPath)), + std::string_view, "/path/to/manifest.avro"); + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kManifestLength)), + int64_t, 12345); + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kPartitionSpecId)), + int32_t, 1); + EXPECT_SCALAR_EQ(struct_like.GetField(static_cast(ManifestFileField::kContent)), + int32_t, static_cast(ManifestFile::Content::kData)); + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kSequenceNumber)), + int64_t, 100); + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kAddedFilesCount)), + int32_t, 10); + EXPECT_THAT(struct_like.GetField(100), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(ManifestFileStructLike, OptionalFields) { + ManifestFile manifest_file{.manifest_path = "/path/to/manifest2.avro", + .manifest_length = 54321, + .partition_spec_id = 2, + .content = ManifestFile::Content::kDeletes, + .sequence_number = 200, + .min_sequence_number = 180, + .added_snapshot_id = 2001, + .added_files_count = std::nullopt, // null optional field + .existing_files_count = 15, + .deleted_files_count = std::nullopt, // null optional field + .added_rows_count = std::nullopt, // null optional field + .existing_rows_count = 1500, + .deleted_rows_count = 200, + .partitions = {}, + .key_metadata = {}, + .first_row_id = 12345}; + ManifestFileStructLike struct_like(manifest_file); + + EXPECT_SCALAR_NULL( + struct_like.GetField(static_cast(ManifestFileField::kAddedFilesCount))); + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kExistingFilesCount)), + int32_t, 15); + EXPECT_SCALAR_EQ( + struct_like.GetField(static_cast(ManifestFileField::kFirstRowId)), int64_t, + 12345); + EXPECT_SCALAR_EQ(struct_like.GetField(static_cast(ManifestFileField::kContent)), + int32_t, static_cast(ManifestFile::Content::kDeletes)); +} + +TEST(ManifestFileStructLike, WithPartitions) { + ManifestFile manifest_file{ + .manifest_path = "/path/to/manifest3.avro", + .manifest_length = 98765, + .partition_spec_id = 3, + .content = ManifestFile::Content::kData, + .sequence_number = 300, + .min_sequence_number = 290, + .added_snapshot_id = 3001, + .added_files_count = 20, + .existing_files_count = 10, + .deleted_files_count = 1, + .added_rows_count = 2000, + .existing_rows_count = 1000, + .deleted_rows_count = 10, + .partitions = {{.contains_null = true, + .contains_nan = false, + .lower_bound = std::vector{0x01, 0x02, 0x03}, + .upper_bound = std::vector{0x04, 0x05, 0x06}}, + {.contains_null = false, + .contains_nan = std::nullopt, + .lower_bound = std::vector{0x10, 0x20}, + .upper_bound = std::nullopt}}}; + + ManifestFileStructLike struct_like(manifest_file); + + auto partitions_result = struct_like.GetField( + static_cast(ManifestFileField::kPartitionFieldSummary)); + ASSERT_THAT(partitions_result, IsOk()); + auto partitions_scalar = partitions_result.value(); + ASSERT_TRUE(std::holds_alternative>(partitions_scalar)); + auto partitions_array = std::get>(partitions_scalar); + EXPECT_EQ(partitions_array->size(), 2); + + // Test 1st partition summary + auto first_partition_result = partitions_array->GetElement(0); + ASSERT_THAT(first_partition_result, IsOk()); + auto first_partition_scalar = first_partition_result.value(); + ASSERT_TRUE( + std::holds_alternative>(first_partition_scalar)); + auto first_partition_struct = + std::get>(first_partition_scalar); + EXPECT_EQ(first_partition_struct->num_fields(), 4); + EXPECT_SCALAR_EQ(first_partition_struct->GetField(0), bool, true); + EXPECT_SCALAR_EQ(first_partition_struct->GetField(1), bool, false); + auto lower_bound_result = first_partition_struct->GetField(2); + ASSERT_THAT(lower_bound_result, IsOk()); + auto lower_bound_scalar = lower_bound_result.value(); + ASSERT_TRUE(std::holds_alternative(lower_bound_scalar)); + auto lower_bound_view = std::get(lower_bound_scalar); + EXPECT_EQ(lower_bound_view.size(), 3); + EXPECT_EQ(static_cast(lower_bound_view[0]), 0x01); + EXPECT_EQ(static_cast(lower_bound_view[1]), 0x02); + EXPECT_EQ(static_cast(lower_bound_view[2]), 0x03); + + // Test 2nd partition summary with null fields + auto second_partition_result = partitions_array->GetElement(1); + ASSERT_THAT(second_partition_result, IsOk()); + auto second_partition_scalar = second_partition_result.value(); + ASSERT_TRUE( + std::holds_alternative>(second_partition_scalar)); + auto second_partition_struct = + std::get>(second_partition_scalar); + EXPECT_SCALAR_NULL(second_partition_struct->GetField(1)); + EXPECT_SCALAR_NULL(second_partition_struct->GetField(3)); +} + +TEST(ArrowArrayStructLike, PrimitiveFields) { + auto struct_type = ::arrow::struct_( + {::arrow::field("id", ::arrow::int64(), /*nullable=*/false), + ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true), + ::arrow::field("score", ::arrow::float32(), /*nullable=*/true), + ::arrow::field("active", ::arrow::boolean(), /*nullable=*/false), + ::arrow::field("date", ::arrow::date32(), /*nullable=*/false), + ::arrow::field("time", ::arrow::time64(::arrow::TimeUnit::MICRO), + /*nullable=*/false), + ::arrow::field("timestamp", ::arrow::timestamp(::arrow::TimeUnit::MICRO), + /*nullable=*/false), + ::arrow::field("fixed", ::arrow::fixed_size_binary(4), /*nullable=*/false), + ::arrow::field("decimal", ::arrow::decimal128(10, 2), /*nullable=*/false)}); + + auto arrow_array = ::arrow::json::ArrayFromJSONString(struct_type, R"([ + {"id": 1, "name": "Alice", "score": 95.5, "active": true, "date": 1714396800, + "time": 123456, "timestamp": 1714396800000000, "fixed": "aaaa", "decimal": "1234.56"}, + {"id": 2, "name": "Bob", "score": null, "active": false, "date": 1714396801, + "time": 123457, "timestamp": 1714396800000001, "fixed": "bbbb", "decimal": "-1234.56"}, + {"id": 3, "name": null, "score": 87.2, "active": true, "date": 1714396802, + "time": 123458, "timestamp": 1714396800000002, "fixed": "cccc", "decimal": "1234.00"}])") + .ValueOrDie(); + + ArrowSchema c_schema; + ArrowArray c_array; + internal::ArrowSchemaGuard schema_guard(&c_schema); + internal::ArrowArrayGuard array_guard(&c_array); + ASSERT_TRUE(::arrow::ExportType(*struct_type, &c_schema).ok()); + ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok()); + + auto struct_like_result = ArrowArrayStructLike::Make(c_schema, c_array); + ASSERT_THAT(struct_like_result, IsOk()); + auto struct_like = std::move(struct_like_result.value()); + + constexpr int64_t kNumRows = 3; + std::array ids = {1, 2, 3}; + std::array, kNumRows> names = {"Alice", "Bob", std::nullopt}; + std::array, kNumRows> scores = {95.5f, std::nullopt, 87.2f}; + std::array actives = {true, false, true}; + std::array dates = {1714396800, 1714396801, 1714396802}; + std::array times = {123456, 123457, 123458}; + std::array timestamps = {1714396800000000, 1714396800000001, + 1714396800000002}; + std::array fixeds = {"aaaa", "bbbb", "cccc"}; + std::array decimals = {"1234.56", "-1234.56", "1234.00"}; + + for (int64_t i = 0; i < kNumRows; ++i) { + ASSERT_THAT(struct_like->Reset(i), IsOk()); + EXPECT_SCALAR_EQ(struct_like->GetField(0), int64_t, ids[i]); + if (names[i].has_value()) { + EXPECT_SCALAR_EQ(struct_like->GetField(1), std::string_view, names[i]); + } else { + EXPECT_SCALAR_NULL(struct_like->GetField(1)); + } + if (scores[i].has_value()) { + EXPECT_SCALAR_EQ(struct_like->GetField(2), float, scores[i].value()); + } else { + EXPECT_SCALAR_NULL(struct_like->GetField(2)); + } + EXPECT_SCALAR_EQ(struct_like->GetField(3), bool, actives[i]); + EXPECT_SCALAR_EQ(struct_like->GetField(4), int32_t, dates[i]); + EXPECT_SCALAR_EQ(struct_like->GetField(5), int64_t, times[i]); + EXPECT_SCALAR_EQ(struct_like->GetField(6), int64_t, timestamps[i]); + EXPECT_SCALAR_EQ(struct_like->GetField(7), std::string_view, fixeds[i]); + EXPECT_DECIMAL_EQ(struct_like->GetField(8), /*scale=*/2, decimals[i]); + } +} + +TEST(ArrowArrayStructLike, NestedStruct) { + auto person_type = + ::arrow::struct_({::arrow::field("name", ::arrow::utf8(), /*nullable=*/false), + ::arrow::field("age", ::arrow::int32(), /*nullable=*/false)}); + auto root_type = + ::arrow::struct_({::arrow::field("id", ::arrow::int64(), /*nullable=*/false), + ::arrow::field("person", person_type, /*nullable=*/false)}); + + auto arrow_array = ::arrow::json::ArrayFromJSONString(root_type, R"([ + {"id": 1, "person": {"name": "Alice", "age": 30}}, + {"id": 2, "person": {"name": "Bob", "age": 25}}])") + .ValueOrDie(); + + ArrowSchema c_schema; + ArrowArray c_array; + internal::ArrowSchemaGuard schema_guard(&c_schema); + internal::ArrowArrayGuard array_guard(&c_array); + ASSERT_TRUE(::arrow::ExportType(*root_type, &c_schema).ok()); + ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok()); + + auto struct_like_result = ArrowArrayStructLike::Make(c_schema, c_array); + ASSERT_THAT(struct_like_result, IsOk()); + auto struct_like = std::move(struct_like_result.value()); + + constexpr int64_t kNumRows = 2; + std::array ids = {1, 2}; + std::array names = {"Alice", "Bob"}; + std::array ages = {30, 25}; + + for (int64_t i = 0; i < kNumRows; ++i) { + ASSERT_THAT(struct_like->Reset(i), IsOk()); + EXPECT_EQ(struct_like->num_fields(), 2); + EXPECT_SCALAR_EQ(struct_like->GetField(0), int64_t, ids[i]); + + auto person_result = struct_like->GetField(1); + ASSERT_THAT(person_result, IsOk()); + auto person_scalar = person_result.value(); + ASSERT_TRUE(std::holds_alternative>(person_scalar)); + + auto person_struct = std::get>(person_scalar); + EXPECT_EQ(person_struct->num_fields(), 2); + EXPECT_SCALAR_EQ(person_struct->GetField(0), std::string_view, names[i]); + EXPECT_SCALAR_EQ(person_struct->GetField(1), int32_t, ages[i]); + } +} + +TEST(ArrowArrayStructLike, PrimitiveList) { + auto list_type = + ::arrow::list(::arrow::field("item", ::arrow::int32(), /*nullable=*/false)); + + auto arrow_array = ::arrow::json::ArrayFromJSONString(list_type, R"([ + [1, 2, 3, 4, 5], + [10, 20], + []])") + .ValueOrDie(); + + ArrowSchema c_schema; + ArrowArray c_array; + internal::ArrowSchemaGuard schema_guard(&c_schema); + internal::ArrowArrayGuard array_guard(&c_array); + ASSERT_TRUE(::arrow::ExportType(*list_type, &c_schema).ok()); + ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok()); + + auto array_like_result = ArrowArrayArrayLike::Make(c_schema, c_array); + ASSERT_THAT(array_like_result, IsOk()); + auto array_like = std::move(array_like_result.value()); + + constexpr int64_t kNumRows = 3; + std::array, kNumRows> expected_lists = { + std::vector{1, 2, 3, 4, 5}, + std::vector{10, 20}, + std::vector{}, + }; + + for (int64_t i = 0; i < kNumRows; ++i) { + ASSERT_THAT(array_like->Reset(i), IsOk()); + const auto& expected_list = expected_lists[i]; + ASSERT_EQ(array_like->size(), expected_list.size()); + for (size_t j = 0; j < expected_list.size(); ++j) { + EXPECT_SCALAR_EQ(array_like->GetElement(j), int32_t, expected_list[j]); + } + } +} + +TEST(ArrowArrayStructLike, PrimitiveMap) { + auto map_type = std::make_shared<::arrow::MapType>( + ::arrow::field("key", ::arrow::utf8(), /*nullable=*/false), + ::arrow::field("value", ::arrow::int32(), /*nullable=*/false)); + + auto arrow_array = ::arrow::json::ArrayFromJSONString(map_type, R"([ + [["Foo", 1], ["Bar", 2]], + [["Baz", 1]], + []])") + .ValueOrDie(); + + ArrowSchema c_schema; + ArrowArray c_array; + internal::ArrowSchemaGuard schema_guard(&c_schema); + internal::ArrowArrayGuard array_guard(&c_array); + ASSERT_TRUE(::arrow::ExportType(*map_type, &c_schema).ok()); + ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok()); + + auto map_like_result = ArrowArrayMapLike::Make(c_schema, c_array); + ASSERT_THAT(map_like_result, IsOk()); + auto map_like = std::move(map_like_result.value()); + + constexpr int64_t kNumRows = 3; + std::array>, kNumRows> expected_maps = { + std::vector>{{"Foo", 1}, {"Bar", 2}}, + std::vector>{{"Baz", 1}}, + std::vector>{}, + }; + + for (int64_t i = 0; i < kNumRows; ++i) { + ASSERT_THAT(map_like->Reset(i), IsOk()); + const auto& expected_map = expected_maps[i]; + ASSERT_EQ(map_like->size(), expected_map.size()); + for (size_t j = 0; j < expected_map.size(); ++j) { + EXPECT_SCALAR_EQ(map_like->GetKey(j), std::string_view, expected_map[j].first); + EXPECT_SCALAR_EQ(map_like->GetValue(j), int32_t, expected_map[j].second); + } + } +} + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 09d836e8..a395b6e9 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -137,6 +137,13 @@ class Reader; class Writer; class StructLike; +class ArrayLike; +class MapLike; + +/// ---------------------------------------------------------------------------- +/// TODO: Forward declarations below are not added yet. +/// ---------------------------------------------------------------------------- + class MetadataUpdate; class UpdateRequirement; class AppendFiles;