Skip to content

Commit 90d324e

Browse files
committed
feat: implement DataWriter for Iceberg data files
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2). Implementation: - Static factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Write/Close/Metadata methods - Idempotent Close() - multiple calls succeed (no-op after first) - PIMPL idiom for ABI stability - Not thread-safe (documented) Tests: - 13 comprehensive unit tests including parameterized format tests - Coverage: creation, write/close lifecycle, metadata generation, error handling, feature validation, and data size verification - All tests passing (13/13) Related to #441
1 parent 43b83c5 commit 90d324e

File tree

3 files changed

+530
-5
lines changed

3 files changed

+530
-5
lines changed

src/iceberg/data/data_writer.cc

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,127 @@
1919

2020
#include "iceberg/data/data_writer.h"
2121

22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/manifest/manifest_entry.h"
24+
#include "iceberg/util/macros.h"
25+
2226
namespace iceberg {
2327

2428
class DataWriter::Impl {
2529
public:
30+
static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
31+
WriterOptions writer_options;
32+
writer_options.path = options.path;
33+
writer_options.schema = options.schema;
34+
writer_options.io = options.io;
35+
writer_options.properties = WriterProperties::FromMap(options.properties);
36+
37+
ICEBERG_ASSIGN_OR_RAISE(
38+
auto writer, WriterFactoryRegistry::Open(options.format, writer_options));
39+
40+
return std::unique_ptr<Impl>(
41+
new Impl(std::move(options), std::move(writer)));
42+
}
43+
44+
Status Write(ArrowArray* data) {
45+
if (!writer_) {
46+
return InvalidArgument("Writer not initialized");
47+
}
48+
return writer_->Write(data);
49+
}
50+
51+
Result<int64_t> Length() const {
52+
if (!writer_) {
53+
return InvalidArgument("Writer not initialized");
54+
}
55+
return writer_->length();
56+
}
57+
58+
Status Close() {
59+
if (!writer_) {
60+
return InvalidArgument("Writer not initialized");
61+
}
62+
if (closed_) {
63+
// Idempotent: no-op if already closed
64+
return {};
65+
}
66+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
67+
closed_ = true;
68+
return {};
69+
}
70+
71+
Result<FileWriter::WriteResult> Metadata() {
72+
if (!closed_) {
73+
return InvalidArgument("Cannot get metadata before closing the writer");
74+
}
75+
76+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
77+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
78+
auto split_offsets = writer_->split_offsets();
79+
80+
auto data_file = std::make_shared<DataFile>();
81+
data_file->content = DataFile::Content::kData;
82+
data_file->file_path = options_.path;
83+
data_file->file_format = options_.format;
84+
data_file->partition = options_.partition;
85+
data_file->record_count = metrics.row_count.value_or(0);
86+
data_file->file_size_in_bytes = length;
87+
data_file->sort_order_id = options_.sort_order_id;
88+
data_file->split_offsets = std::move(split_offsets);
89+
90+
// Convert metrics maps from unordered_map to map
91+
for (const auto& [col_id, size] : metrics.column_sizes) {
92+
data_file->column_sizes[col_id] = size;
93+
}
94+
for (const auto& [col_id, count] : metrics.value_counts) {
95+
data_file->value_counts[col_id] = count;
96+
}
97+
for (const auto& [col_id, count] : metrics.null_value_counts) {
98+
data_file->null_value_counts[col_id] = count;
99+
}
100+
for (const auto& [col_id, count] : metrics.nan_value_counts) {
101+
data_file->nan_value_counts[col_id] = count;
102+
}
103+
104+
// Serialize literal bounds to binary format
105+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
106+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
107+
data_file->lower_bounds[col_id] = std::move(serialized);
108+
}
109+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
110+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
111+
data_file->upper_bounds[col_id] = std::move(serialized);
112+
}
113+
114+
FileWriter::WriteResult result;
115+
result.data_files.push_back(std::move(data_file));
116+
return result;
117+
}
118+
119+
private:
120+
Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
121+
: options_(std::move(options)), writer_(std::move(writer)) {}
122+
123+
DataWriterOptions options_;
124+
std::unique_ptr<Writer> writer_;
125+
bool closed_ = false;
26126
};
27127

128+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
129+
28130
DataWriter::~DataWriter() = default;
29131

30-
Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
132+
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
133+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
134+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
135+
}
136+
137+
Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31138

32-
Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
139+
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
33140

34-
Status DataWriter::Close() { return NotImplemented(""); }
141+
Status DataWriter::Close() { return impl_->Close(); }
35142

36-
Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); }
143+
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }
37144

38145
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,16 @@ struct ICEBERG_EXPORT DataWriterOptions {
5151
};
5252

5353
/// \brief Writer for Iceberg data files.
54+
///
55+
/// This class is not thread-safe. Concurrent calls to Write(), Close(), or Metadata()
56+
/// from multiple threads may result in undefined behavior.
5457
class ICEBERG_EXPORT DataWriter : public FileWriter {
5558
public:
5659
~DataWriter() override;
5760

61+
/// \brief Create a new DataWriter instance.
62+
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
63+
5864
Status Write(ArrowArray* data) override;
5965
Result<int64_t> Length() const override;
6066
Status Close() override;
@@ -63,6 +69,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
6369
private:
6470
class Impl;
6571
std::unique_ptr<Impl> impl_;
72+
73+
explicit DataWriter(std::unique_ptr<Impl> impl);
6674
};
6775

6876
} // namespace iceberg

0 commit comments

Comments
 (0)