Skip to content

Commit e7ddfe8

Browse files
committed
feat: add row-based immutable data structure
- Add StructLike, MapLike, and ArrayLike interfaces - Add wrapper for ManifestFile and ArrowArray
1 parent 1cea1e9 commit e7ddfe8

12 files changed

+1312
-25
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ set(ICEBERG_SOURCES
3131
name_mapping.cc
3232
partition_field.cc
3333
partition_spec.cc
34+
row/arrow_array_wrapper.cc
35+
row/manifest_wrapper.cc
3436
schema.cc
3537
schema_field.cc
3638
schema_internal.cc
@@ -104,6 +106,7 @@ iceberg_install_all_headers(iceberg)
104106

105107
add_subdirectory(catalog)
106108
add_subdirectory(expression)
109+
add_subdirectory(row)
107110
add_subdirectory(util)
108111

109112
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h

src/iceberg/manifest_reader_internal.cc

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "iceberg/manifest_list.h"
2828
#include "iceberg/schema.h"
2929
#include "iceberg/type.h"
30+
#include "iceberg/util/checked_cast.h"
3031
#include "iceberg/util/macros.h"
3132

3233
namespace iceberg {
@@ -37,7 +38,7 @@ namespace iceberg {
3738
}
3839

3940
#define PARSE_PRIMITIVE_FIELD(item, array_view, type) \
40-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
41+
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
4142
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
4243
auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \
4344
item = static_cast<type>(value); \
@@ -48,7 +49,7 @@ namespace iceberg {
4849
}
4950

5051
#define PARSE_STRING_FIELD(item, array_view) \
51-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
52+
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
5253
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
5354
auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \
5455
item = std::string(value.data, value.size_bytes); \
@@ -59,7 +60,7 @@ namespace iceberg {
5960
}
6061

6162
#define PARSE_BINARY_FIELD(item, array_view) \
62-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
63+
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
6364
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
6465
item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \
6566
} else if (required) { \
@@ -225,66 +226,67 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
225226
auto field_name = field.value()->get().name();
226227
bool required = !field.value()->get().optional();
227228
auto view_of_column = array_view.children[idx];
228-
switch (idx) {
229-
case 0:
229+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field, ManifestFileFieldFromIndex(idx));
230+
switch (manifest_file_field) {
231+
case ManifestFileField::kManifestPath:
230232
PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, view_of_column);
231233
break;
232-
case 1:
234+
case ManifestFileField::kManifestLength:
233235
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, view_of_column,
234236
int64_t);
235237
break;
236-
case 2:
238+
case ManifestFileField::kPartitionSpecId:
237239
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, view_of_column,
238240
int32_t);
239241
break;
240-
case 3:
242+
case ManifestFileField::kContent:
241243
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
242244
ManifestFile::Content);
243245
break;
244-
case 4:
246+
case ManifestFileField::kSequenceNumber:
245247
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column,
246248
int64_t);
247249
break;
248-
case 5:
250+
case ManifestFileField::kMinSequenceNumber:
249251
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, view_of_column,
250252
int64_t);
251253
break;
252-
case 6:
254+
case ManifestFileField::kAddedSnapshotId:
253255
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, view_of_column,
254256
int64_t);
255257
break;
256-
case 7:
258+
case ManifestFileField::kAddedFilesCount:
257259
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, view_of_column,
258260
int32_t);
259261
break;
260-
case 8:
262+
case ManifestFileField::kExistingFilesCount:
261263
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
262264
view_of_column, int32_t);
263265
break;
264-
case 9:
266+
case ManifestFileField::kDeletedFilesCount:
265267
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, view_of_column,
266268
int32_t);
267269
break;
268-
case 10:
270+
case ManifestFileField::kAddedRowsCount:
269271
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, view_of_column,
270272
int64_t);
271273
break;
272-
case 11:
274+
case ManifestFileField::kExistingRowsCount:
273275
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, view_of_column,
274276
int64_t);
275277
break;
276-
case 12:
278+
case ManifestFileField::kDeletedRowsCount:
277279
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, view_of_column,
278280
int64_t);
279281
break;
280-
case 13:
282+
case ManifestFileField::kPartitionFieldSummary:
281283
ICEBERG_RETURN_UNEXPECTED(
282284
ParsePartitionFieldSummaryList(view_of_column, manifest_files));
283285
break;
284-
case 14:
286+
case ManifestFileField::kKeyMetadata:
285287
PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, view_of_column);
286288
break;
287-
case 15:
289+
case ManifestFileField::kFirstRowId:
288290
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, view_of_column,
289291
int64_t);
290292
break;
@@ -295,7 +297,7 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
295297
return manifest_files;
296298
}
297299

298-
Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
300+
Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
299301
std::vector<ManifestEntry>& manifest_entries) {
300302
if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
301303
auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
@@ -355,7 +357,7 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
355357
view_of_file_field);
356358
break;
357359
case 2:
358-
for (size_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) {
360+
for (int64_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) {
359361
if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
360362
auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, row_idx);
361363
std::string_view path_str(value.data, value.size_bytes);
@@ -510,7 +512,7 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
510512
break;
511513
case 4: {
512514
auto data_file_schema =
513-
dynamic_pointer_cast<StructType>(field.value()->get().type());
515+
internal::checked_pointer_cast<StructType>(field.value()->get().type());
514516
ICEBERG_RETURN_UNEXPECTED(
515517
ParseDataFile(data_file_schema, view_of_column, manifest_entries));
516518
break;
@@ -571,4 +573,11 @@ Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
571573
return manifest_files;
572574
}
573575

576+
Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
577+
if (index >= 0 && index < static_cast<int32_t>(ManifestFileField::kNextId)) {
578+
return static_cast<ManifestFileField>(index);
579+
}
580+
return InvalidArgument("Invalid manifest file field index: {}", index);
581+
}
582+
574583
} // namespace iceberg

src/iceberg/manifest_reader_internal.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,26 @@ class ManifestListReaderImpl : public ManifestListReader {
6060
std::unique_ptr<Reader> reader_;
6161
};
6262

63+
enum class ManifestFileField : int32_t {
64+
kManifestPath = 0,
65+
kManifestLength,
66+
kPartitionSpecId = 2,
67+
kContent = 3,
68+
kSequenceNumber = 4,
69+
kMinSequenceNumber = 5,
70+
kAddedSnapshotId = 6,
71+
kAddedFilesCount = 7,
72+
kExistingFilesCount = 8,
73+
kDeletedFilesCount = 9,
74+
kAddedRowsCount = 10,
75+
kExistingRowsCount = 11,
76+
kDeletedRowsCount = 12,
77+
kPartitionFieldSummary = 13,
78+
kKeyMetadata = 14,
79+
kFirstRowId = 15,
80+
kNextId = 16,
81+
};
82+
83+
Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index);
84+
6385
} // namespace iceberg

src/iceberg/row/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/row)

0 commit comments

Comments
 (0)