Skip to content

Commit 9122ddc

Browse files
drymancopybara-github
authored andcommitted
Internal changes.
PiperOrigin-RevId: 480067689
1 parent 68d6e0f commit 9122ddc

7 files changed

+205
-8
lines changed

cpp/array_record_reader.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState {
125125
std::queue<IndexedPair<std::future<std::vector<ChunkDecoder>>>>
126126
future_decoders;
127127

128+
// Writer options for debugging purposes.
129+
std::optional<std::string> writer_options = std::nullopt;
130+
128131
uint64_t ChunkEndOffset(uint64_t chunk_idx) const {
129132
if (chunk_idx == footer.size() - 1) {
130133
return footer_offset;
@@ -263,6 +266,10 @@ void ArrayRecordReaderBase::Initialize() {
263266
return;
264267
}
265268
state_->num_records = footer_metadata.array_record_metadata().num_records();
269+
if (footer_metadata.array_record_metadata().has_writer_options()) {
270+
state_->writer_options =
271+
footer_metadata.array_record_metadata().writer_options();
272+
}
266273
}
267274
{
268275
AR_ENDO_SCOPE("Reading footer body");
@@ -757,4 +764,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
757764
return true;
758765
}
759766

767+
std::optional<std::string> ArrayRecordReaderBase::WriterOptionsString() const {
768+
return state_->writer_options;
769+
}
770+
760771
} // namespace array_record

cpp/array_record_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ class ArrayRecordReaderBase : public riegeli::Object {
269269
// `false` (when `!ok()`) - failure
270270
bool ReadRecord(absl::string_view* record);
271271

272+
// Returns the writer options if presented.
273+
std::optional<std::string> WriterOptionsString() const;
274+
272275
protected:
273276
explicit ArrayRecordReaderBase(Options options, ARThreadPool* pool);
274277
~ArrayRecordReaderBase() override;

cpp/array_record_reader_test.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
110110
// Once a reader is moved, it is closed.
111111
ASSERT_FALSE(reader_before_move.is_open()); // NOLINT
112112

113+
auto recorded_writer_options = ArrayRecordWriterBase::Options::FromString(
114+
reader.WriterOptionsString().value())
115+
.value();
116+
EXPECT_EQ(writer_options.compression_type(),
117+
recorded_writer_options.compression_type());
118+
EXPECT_EQ(writer_options.compression_level(),
119+
recorded_writer_options.compression_level());
120+
EXPECT_EQ(writer_options.transpose(), recorded_writer_options.transpose());
121+
113122
std::vector<uint64_t> indices = {1, 2, 4};
114123
ASSERT_TRUE(reader
115124
.ParallelReadRecordsWithIndices(

cpp/array_record_writer.cc

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ limitations under the License.
3131
#include "absl/base/thread_annotations.h"
3232
#include "absl/status/status.h"
3333
#include "absl/status/statusor.h"
34+
#include "absl/strings/match.h"
3435
#include "absl/strings/string_view.h"
3536
#include "absl/synchronization/mutex.h"
3637
#include "absl/types/span.h"
@@ -63,6 +64,8 @@ constexpr uint32_t kZstdDefaultWindowLog = 20;
6364
// Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16`
6465
constexpr uint64_t kMagic = 0x71930e704fdae05eULL;
6566

67+
constexpr char kArrayRecordDefaultCompression[] = "zstd:3";
68+
6669
using riegeli::Chunk;
6770
using riegeli::ChunkType;
6871
using riegeli::CompressorOptions;
@@ -109,11 +112,17 @@ absl::StatusOr<Chunk> ChunkFromSpan(CompressorOptions compression_options,
109112

110113
} // namespace
111114

115+
ArrayRecordWriterBase::Options::Options() {
116+
DCHECK_OK(
117+
this->compressor_options_.FromString(kArrayRecordDefaultCompression));
118+
}
119+
112120
// static
113121
absl::StatusOr<ArrayRecordWriterBase::Options>
114122
ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
115123
ArrayRecordWriterBase::Options options;
116124
OptionsParser options_parser;
125+
options_parser.AddOption("default", ValueParser::FailIfAnySeen());
117126
// Group
118127
options_parser.AddOption(
119128
"group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_));
@@ -151,6 +160,15 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
151160
if (!options_parser.FromString(text)) {
152161
return options_parser.status();
153162
}
163+
// From our benchmarks we figured zstd:3 gives the best trade-off for both the
164+
// compression and decomopression speed.
165+
if (text == "default" ||
166+
(!absl::StrContains(compressor_text, "uncompressed") &&
167+
!absl::StrContains(compressor_text, "brotli") &&
168+
!absl::StrContains(compressor_text, "snappy") &&
169+
!absl::StrContains(compressor_text, "zstd"))) {
170+
absl::StrAppend(&compressor_text, ",", kArrayRecordDefaultCompression);
171+
}
154172
// max_parallelism is set after options_parser.FromString()
155173
if (max_parallelism > 0) {
156174
options.set_max_parallelism(max_parallelism);
@@ -167,13 +185,48 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
167185
return options;
168186
}
169187

188+
std::string ArrayRecordWriterBase::Options::ToString() const {
189+
std::string option;
190+
absl::StrAppend(&option, "group_size:", this->group_size_,
191+
",transpose:", this->transpose_ ? "true" : "false",
192+
",pad_to_block_boundary:",
193+
this->pad_to_block_boundary_ ? "true" : "false");
194+
if (this->transpose_) {
195+
absl::StrAppend(&option,
196+
",transpose_bucket_size:", this->transpose_bucket_size_);
197+
}
198+
switch (this->compressor_options().compression_type()) {
199+
case riegeli::CompressionType::kNone:
200+
absl::StrAppend(&option, ",uncompressed");
201+
break;
202+
case riegeli::CompressionType::kBrotli:
203+
absl::StrAppend(
204+
&option, ",brotli:", this->compressor_options().compression_level());
205+
break;
206+
case riegeli::CompressionType::kZstd:
207+
absl::StrAppend(&option,
208+
",zstd:", this->compressor_options().compression_level());
209+
break;
210+
case riegeli::CompressionType::kSnappy:
211+
absl::StrAppend(&option, ",snappy");
212+
break;
213+
}
214+
if (this->compressor_options().window_log().has_value()) {
215+
absl::StrAppend(&option, ",window_log:",
216+
this->compressor_options().window_log().value());
217+
}
218+
if (max_parallelism_.has_value()) {
219+
absl::StrAppend(&option, ",max_parallelism:", max_parallelism_.value());
220+
}
221+
return option;
222+
}
223+
170224
// Thread compatible callback guarded by SequencedChunkWriter's mutex.
171225
class ArrayRecordWriterBase::SubmitChunkCallback
172226
: public SequencedChunkWriterBase::SubmitChunkCallback {
173227
public:
174228
explicit SubmitChunkCallback(const ArrayRecordWriterBase::Options options)
175-
: compression_options_(options.compressor_options()),
176-
max_parallelism_(options.max_parallelism().value()) {
229+
: options_(options), max_parallelism_(options.max_parallelism().value()) {
177230
constexpr uint64_t kDefaultDecodedDataSize = (1 << 20);
178231
last_decoded_data_size_.store(kDefaultDecodedDataSize);
179232
}
@@ -200,7 +253,7 @@ class ArrayRecordWriterBase::SubmitChunkCallback
200253
void WriteFooterAndPostscript(SequencedChunkWriterBase* writer);
201254

202255
private:
203-
const CompressorOptions compression_options_;
256+
const Options options_;
204257

205258
absl::Mutex mu_;
206259
const int32_t max_parallelism_;
@@ -456,8 +509,11 @@ ArrayRecordWriterBase::SubmitChunkCallback::CreateFooterChunk() {
456509
footer_metadata.mutable_array_record_metadata()->set_num_chunks(
457510
array_footer_.size());
458511
footer_metadata.mutable_array_record_metadata()->set_num_records(num_records);
512+
footer_metadata.mutable_array_record_metadata()->set_writer_options(
513+
options_.ToString());
459514
// Perhaps we can compress the footer
460-
return ChunkFromSpan(compression_options_, absl::MakeConstSpan(array_footer_),
515+
return ChunkFromSpan(options_.compressor_options(),
516+
absl::MakeConstSpan(array_footer_),
461517
std::optional(footer_metadata));
462518
}
463519

cpp/array_record_writer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
8383
public:
8484
class Options {
8585
public:
86-
Options() {}
86+
Options();
8787

8888
// Parses options from text:
8989
// ```
@@ -268,6 +268,9 @@ class ArrayRecordWriterBase : public riegeli::Object {
268268
return metadata_;
269269
}
270270

271+
// Serialize the options to a string.
272+
std::string ToString() const;
273+
271274
private:
272275
int32_t group_size_ = kDefaultGroupSize;
273276
riegeli::CompressorOptions compressor_options_;

cpp/array_record_writer_test.cc

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,58 @@ INSTANTIATE_TEST_SUITE_P(
193193
testing::Bool(), testing::Bool(), testing::Bool()));
194194

195195
TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
196+
{
197+
auto option = ArrayRecordWriterBase::Options();
198+
EXPECT_EQ(option.group_size(),
199+
ArrayRecordWriterBase::Options::kDefaultGroupSize);
200+
EXPECT_FALSE(option.transpose());
201+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
202+
EXPECT_EQ(option.compressor_options().compression_type(),
203+
riegeli::CompressionType::kZstd);
204+
EXPECT_EQ(option.compressor_options().compression_level(), 3);
205+
EXPECT_FALSE(option.pad_to_block_boundary());
206+
}
196207
{
197208
auto option = ArrayRecordWriterBase::Options::FromString("").value();
198209
EXPECT_EQ(option.group_size(),
199210
ArrayRecordWriterBase::Options::kDefaultGroupSize);
200211
EXPECT_FALSE(option.transpose());
201212
EXPECT_EQ(option.max_parallelism(), std::nullopt);
202213
EXPECT_EQ(option.compressor_options().compression_type(),
203-
riegeli::CompressionType::kBrotli);
214+
riegeli::CompressionType::kZstd);
215+
EXPECT_EQ(option.compressor_options().compression_level(), 3);
216+
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
217+
EXPECT_FALSE(option.pad_to_block_boundary());
218+
219+
EXPECT_EQ(option.ToString(),
220+
"group_size:65536,"
221+
"transpose:false,"
222+
"pad_to_block_boundary:false,"
223+
"zstd:3,"
224+
"window_log:20");
225+
EXPECT_TRUE(
226+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
227+
}
228+
{
229+
auto option = ArrayRecordWriterBase::Options::FromString("default").value();
230+
EXPECT_EQ(option.group_size(),
231+
ArrayRecordWriterBase::Options::kDefaultGroupSize);
232+
EXPECT_FALSE(option.transpose());
233+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
234+
EXPECT_EQ(option.compressor_options().compression_type(),
235+
riegeli::CompressionType::kZstd);
236+
EXPECT_EQ(option.compressor_options().compression_level(), 3);
237+
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
204238
EXPECT_FALSE(option.pad_to_block_boundary());
239+
240+
EXPECT_EQ(option.ToString(),
241+
"group_size:65536,"
242+
"transpose:false,"
243+
"pad_to_block_boundary:false,"
244+
"zstd:3,"
245+
"window_log:20");
246+
EXPECT_TRUE(
247+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
205248
}
206249
{
207250
auto option = ArrayRecordWriterBase::Options::FromString(
@@ -210,10 +253,42 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
210253
EXPECT_EQ(option.group_size(), 32);
211254
EXPECT_TRUE(option.transpose());
212255
EXPECT_EQ(option.max_parallelism(), std::nullopt);
256+
EXPECT_EQ(option.compressor_options().compression_type(),
257+
riegeli::CompressionType::kZstd);
258+
EXPECT_EQ(option.compressor_options().window_log(), 20);
259+
EXPECT_FALSE(option.pad_to_block_boundary());
260+
261+
EXPECT_EQ(option.ToString(),
262+
"group_size:32,"
263+
"transpose:true,"
264+
"pad_to_block_boundary:false,"
265+
"transpose_bucket_size:256,"
266+
"zstd:3,"
267+
"window_log:20");
268+
EXPECT_TRUE(
269+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
270+
}
271+
{
272+
auto option = ArrayRecordWriterBase::Options::FromString(
273+
"brotli:6,group_size:32,transpose,window_log:25")
274+
.value();
275+
EXPECT_EQ(option.group_size(), 32);
276+
EXPECT_TRUE(option.transpose());
277+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
213278
EXPECT_EQ(option.compressor_options().compression_type(),
214279
riegeli::CompressionType::kBrotli);
215-
EXPECT_EQ(option.compressor_options().brotli_window_log(), 20);
280+
EXPECT_EQ(option.compressor_options().window_log(), 25);
216281
EXPECT_FALSE(option.pad_to_block_boundary());
282+
283+
EXPECT_EQ(option.ToString(),
284+
"group_size:32,"
285+
"transpose:true,"
286+
"pad_to_block_boundary:false,"
287+
"transpose_bucket_size:256,"
288+
"brotli:6,"
289+
"window_log:25");
290+
EXPECT_TRUE(
291+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
217292
}
218293
{
219294
auto option = ArrayRecordWriterBase::Options::FromString(
@@ -224,9 +299,19 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
224299
EXPECT_EQ(option.max_parallelism(), std::nullopt);
225300
EXPECT_EQ(option.compressor_options().compression_type(),
226301
riegeli::CompressionType::kZstd);
227-
EXPECT_EQ(option.compressor_options().zstd_window_log(), 20);
302+
EXPECT_EQ(option.compressor_options().window_log(), 20);
228303
EXPECT_EQ(option.compressor_options().compression_level(), 5);
229304
EXPECT_FALSE(option.pad_to_block_boundary());
305+
306+
EXPECT_EQ(option.ToString(),
307+
"group_size:32,"
308+
"transpose:true,"
309+
"pad_to_block_boundary:false,"
310+
"transpose_bucket_size:256,"
311+
"zstd:5,"
312+
"window_log:20");
313+
EXPECT_TRUE(
314+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
230315
}
231316
{
232317
auto option = ArrayRecordWriterBase::Options::FromString(
@@ -239,6 +324,34 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
239324
EXPECT_EQ(option.compressor_options().compression_type(),
240325
riegeli::CompressionType::kNone);
241326
EXPECT_TRUE(option.pad_to_block_boundary());
327+
328+
EXPECT_EQ(option.ToString(),
329+
"group_size:65536,"
330+
"transpose:false,"
331+
"pad_to_block_boundary:true,"
332+
"uncompressed");
333+
EXPECT_TRUE(
334+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
335+
}
336+
{
337+
auto option = ArrayRecordWriterBase::Options::FromString(
338+
"snappy,pad_to_block_boundary:true")
339+
.value();
340+
EXPECT_EQ(option.group_size(),
341+
ArrayRecordWriterBase::Options::kDefaultGroupSize);
342+
EXPECT_FALSE(option.transpose());
343+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
344+
EXPECT_EQ(option.compressor_options().compression_type(),
345+
riegeli::CompressionType::kSnappy);
346+
EXPECT_TRUE(option.pad_to_block_boundary());
347+
348+
EXPECT_EQ(option.ToString(),
349+
"group_size:65536,"
350+
"transpose:false,"
351+
"pad_to_block_boundary:true,"
352+
"snappy");
353+
EXPECT_TRUE(
354+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
242355
}
243356
}
244357

cpp/layout.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ message RiegeliFooterMetadata {
8282
optional uint32 version = 1;
8383
optional uint64 num_chunks = 2;
8484
optional uint64 num_records = 3;
85+
// Writer options for debugging purposes.
86+
optional string writer_options = 4;
8587
}
8688

8789
oneof metadata {

0 commit comments

Comments
 (0)