Skip to content

Commit f2d0abd

Browse files
authored
fix: correct partition field handling for non-partitioned tables and add test for manifest file reader (#175)
- Fix ManifestEntry::operator== logic with proper parentheses grouping - Add null pointer safety in DataFile::Type() for non-partitioned tables - Change partition field type validation from list to struct in manifest reader - Support empty partition structs for non-partitioned tables (n_children == 0) - Update unit test for manifest file: separate v1 and v2 test, and add another v2 test for non-partitioned tables
1 parent bc87f00 commit f2d0abd

File tree

5 files changed

+106
-18
lines changed

5 files changed

+106
-18
lines changed

src/iceberg/manifest_entry.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,24 @@
2222
#include <memory>
2323
#include <vector>
2424

25+
#include "iceberg/schema.h"
2526
#include "iceberg/schema_field.h"
2627
#include "iceberg/type.h"
2728

2829
namespace iceberg {
2930

3031
bool ManifestEntry::operator==(const ManifestEntry& other) const {
3132
return status == other.status && snapshot_id == other.snapshot_id &&
32-
sequence_number == other.sequence_number &&
33-
file_sequence_number == other.file_sequence_number &&
34-
(data_file && other.data_file && *data_file == *other.data_file) ||
35-
(!data_file && !other.data_file);
33+
sequence_number == other.sequence_number &&
34+
file_sequence_number == other.file_sequence_number &&
35+
((data_file && other.data_file && *data_file == *other.data_file) ||
36+
(!data_file && !other.data_file));
3637
}
3738

3839
std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
40+
if (!partition_type) {
41+
partition_type = PartitionSpec::Unpartitioned()->schema();
42+
}
3943
return std::make_shared<StructType>(std::vector<SchemaField>{
4044
kContent,
4145
kFilePath,

src/iceberg/manifest_reader_internal.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -368,15 +368,17 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
368368
break;
369369
case 3: {
370370
if (view_of_file_field->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
371-
return InvalidManifest("Field:{} should be a list.", field_name);
371+
return InvalidManifest("Field:{} should be a struct.", field_name);
372372
}
373-
auto view_of_partition = view_of_file_field->children[0];
374-
for (size_t row_idx = 0; row_idx < view_of_partition->length; row_idx++) {
375-
if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
376-
break;
373+
if (view_of_file_field->n_children > 0) {
374+
auto view_of_partition = view_of_file_field->children[0];
375+
for (int64_t row_idx = 0; row_idx < view_of_partition->length; row_idx++) {
376+
if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
377+
break;
378+
}
379+
ICEBERG_RETURN_UNEXPECTED(
380+
ParseLiteral(view_of_partition, row_idx, manifest_entries));
377381
}
378-
ICEBERG_RETURN_UNEXPECTED(
379-
ParseLiteral(view_of_partition, row_idx, manifest_entries));
380382
}
381383
} break;
382384
case 4:

src/iceberg/partition_spec.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
4646
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
4747
static const std::shared_ptr<PartitionSpec> unpartitioned =
4848
std::make_shared<PartitionSpec>(
49-
/*schema=*/nullptr, kInitialSpecId, std::vector<PartitionField>{},
50-
kLegacyPartitionDataIdStart - 1);
49+
/*schema=*/std::make_shared<Schema>(std::vector<SchemaField>{}), kInitialSpecId,
50+
std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1);
5151
return unpartitioned;
5252
}
5353

test/manifest_reader_test.cc

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,22 @@
1919

2020
#include "iceberg/manifest_reader.h"
2121

22+
#include <cstddef>
23+
2224
#include <arrow/filesystem/localfs.h>
2325
#include <gtest/gtest.h>
2426

2527
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2628
#include "iceberg/avro/avro_reader.h"
2729
#include "iceberg/avro/avro_register.h"
28-
#include "iceberg/avro/avro_schema_util_internal.h"
2930
#include "iceberg/manifest_entry.h"
3031
#include "iceberg/schema.h"
3132
#include "temp_file_test_base.h"
3233
#include "test_common.h"
3334

3435
namespace iceberg {
3536

36-
class ManifestReaderTest : public TempFileTestBase {
37+
class ManifestReaderV1Test : public TempFileTestBase {
3738
protected:
3839
static void SetUpTestSuite() { avro::AvroReader::Register(); }
3940

@@ -45,7 +46,7 @@ class ManifestReaderTest : public TempFileTestBase {
4546
avro::RegisterLogicalTypes();
4647
}
4748

48-
std::vector<ManifestEntry> prepare_manifest_entries() {
49+
std::vector<ManifestEntry> PrepareV1ManifestEntries() {
4950
std::vector<ManifestEntry> manifest_entries;
5051
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/data/";
5152
std::vector<std::string> paths = {
@@ -102,7 +103,7 @@ class ManifestReaderTest : public TempFileTestBase {
102103
std::shared_ptr<FileIO> file_io_;
103104
};
104105

105-
TEST_F(ManifestReaderTest, BasicTest) {
106+
TEST_F(ManifestReaderV1Test, V1PartitionedBasicTest) {
106107
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
107108
auto partition_schema =
108109
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
@@ -115,7 +116,88 @@ TEST_F(ManifestReaderTest, BasicTest) {
115116
auto read_result = manifest_reader->Entries();
116117
ASSERT_EQ(read_result.has_value(), true) << read_result.error().message;
117118

118-
auto expected_entries = prepare_manifest_entries();
119+
auto expected_entries = PrepareV1ManifestEntries();
120+
ASSERT_EQ(read_result.value(), expected_entries);
121+
}
122+
123+
class ManifestReaderV2Test : public TempFileTestBase {
124+
protected:
125+
static void SetUpTestSuite() { avro::AvroReader::Register(); }
126+
127+
void SetUp() override {
128+
TempFileTestBase::SetUp();
129+
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
130+
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
131+
132+
avro::RegisterLogicalTypes();
133+
}
134+
135+
std::vector<ManifestEntry> PrepareV2NonPartitionedManifestEntries() {
136+
std::vector<ManifestEntry> manifest_entries;
137+
std::string test_dir_prefix = "/tmp/db/db/v2_manifest_non_partitioned/data/";
138+
139+
std::vector<std::string> paths = {
140+
"00000-0-b0f98903-6d21-45fd-9e0b-afbd4963e365-0-00001.parquet"};
141+
142+
std::vector<int64_t> file_sizes = {1344};
143+
std::vector<int64_t> record_counts = {4};
144+
145+
std::vector<std::map<int32_t, std::vector<uint8_t>>> lower_bounds = {
146+
{{1, {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
147+
{2, {'r', 'e', 'c', 'o', 'r', 'd', '_', 'f', 'o', 'u', 'r'}},
148+
{3, {'d', 'a', 't', 'a', '_', 'c', 'o', 'n', 't', 'e', 'n', 't', '_', '1'}},
149+
{4, {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xdc, 0x5e, 0x40}}}};
150+
151+
std::vector<std::map<int32_t, std::vector<uint8_t>>> upper_bounds = {
152+
{{1, {0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
153+
{2, {'r', 'e', 'c', 'o', 'r', 'd', '_', 't', 'w', 'o'}},
154+
{3, {'d', 'a', 't', 'a', '_', 'c', 'o', 'n', 't', 'e', 'n', 't', '_', '4'}},
155+
{4, {0x14, 0xae, 0x47, 0xe1, 0x7a, 0x8c, 0x7c, 0x40}}}};
156+
157+
manifest_entries.emplace_back(
158+
ManifestEntry{.status = ManifestStatus::kAdded,
159+
.snapshot_id = 679879563479918846LL,
160+
.sequence_number = std::nullopt,
161+
.file_sequence_number = std::nullopt,
162+
.data_file = std::make_shared<DataFile>(
163+
DataFile{.file_path = test_dir_prefix + paths[0],
164+
.file_format = FileFormatType::kParquet,
165+
.record_count = record_counts[0],
166+
.file_size_in_bytes = file_sizes[0],
167+
.column_sizes = {{1, 56}, {2, 73}, {3, 66}, {4, 67}},
168+
.value_counts = {{1, 4}, {2, 4}, {3, 4}, {4, 4}},
169+
.null_value_counts = {{1, 0}, {2, 0}, {3, 0}, {4, 0}},
170+
.nan_value_counts = {{4, 0}},
171+
.lower_bounds = lower_bounds[0],
172+
.upper_bounds = upper_bounds[0],
173+
.key_metadata = {},
174+
.split_offsets = {4},
175+
.equality_ids = {},
176+
.sort_order_id = 0,
177+
.first_row_id = std::nullopt,
178+
.referenced_data_file = std::nullopt,
179+
.content_offset = std::nullopt,
180+
.content_size_in_bytes = std::nullopt})});
181+
return manifest_entries;
182+
}
183+
184+
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
185+
std::shared_ptr<FileIO> file_io_;
186+
};
187+
188+
TEST_F(ManifestReaderV2Test, V2NonPartitionedBasicTest) {
189+
std::string path = GetResourcePath("2ddf1bc9-830b-4015-aced-c060df36f150-m0.avro");
190+
191+
auto manifest_reader_result = ManifestReader::MakeReader(path, file_io_, nullptr);
192+
ASSERT_EQ(manifest_reader_result.has_value(), true)
193+
<< manifest_reader_result.error().message;
194+
195+
auto manifest_reader = std::move(manifest_reader_result.value());
196+
auto read_result = manifest_reader->Entries();
197+
ASSERT_EQ(read_result.has_value(), true) << read_result.error().message;
198+
ASSERT_EQ(read_result.value().size(), 1);
199+
200+
auto expected_entries = PrepareV2NonPartitionedManifestEntries();
119201
ASSERT_EQ(read_result.value(), expected_entries);
120202
}
121203

7.04 KB
Binary file not shown.

0 commit comments

Comments
 (0)