Skip to content

Commit f8a72d5

Browse files
drymancopybara-github
authored andcommitted
Internal changes.
PiperOrigin-RevId: 480067689
1 parent 68d6e0f commit f8a72d5

7 files changed

+207
-8
lines changed

cpp/array_record_reader.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState {
125125
std::queue<IndexedPair<std::future<std::vector<ChunkDecoder>>>>
126126
future_decoders;
127127

128+
// Writer options for debugging purposes.
129+
std::optional<std::string> writer_options = std::nullopt;
130+
128131
uint64_t ChunkEndOffset(uint64_t chunk_idx) const {
129132
if (chunk_idx == footer.size() - 1) {
130133
return footer_offset;
@@ -263,6 +266,10 @@ void ArrayRecordReaderBase::Initialize() {
263266
return;
264267
}
265268
state_->num_records = footer_metadata.array_record_metadata().num_records();
269+
if (footer_metadata.array_record_metadata().has_writer_options()) {
270+
state_->writer_options =
271+
footer_metadata.array_record_metadata().writer_options();
272+
}
266273
}
267274
{
268275
AR_ENDO_SCOPE("Reading footer body");
@@ -757,4 +764,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
757764
return true;
758765
}
759766

767+
std::optional<std::string> ArrayRecordReaderBase::WriterOptionsString() const {
768+
return state_->writer_options;
769+
}
770+
760771
} // namespace array_record

cpp/array_record_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ class ArrayRecordReaderBase : public riegeli::Object {
269269
// `false` (when `!ok()`) - failure
270270
bool ReadRecord(absl::string_view* record);
271271

272+
// Returns the writer options if presented.
273+
std::optional<std::string> WriterOptionsString() const;
274+
272275
protected:
273276
explicit ArrayRecordReaderBase(Options options, ARThreadPool* pool);
274277
~ArrayRecordReaderBase() override;

cpp/array_record_reader_test.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
110110
// Once a reader is moved, it is closed.
111111
ASSERT_FALSE(reader_before_move.is_open()); // NOLINT
112112

113+
auto recorded_writer_options = ArrayRecordWriterBase::Options::FromString(
114+
reader.WriterOptionsString().value())
115+
.value();
116+
EXPECT_EQ(writer_options.compression_type(),
117+
recorded_writer_options.compression_type());
118+
EXPECT_EQ(writer_options.compression_level(),
119+
recorded_writer_options.compression_level());
120+
EXPECT_EQ(writer_options.transpose(), recorded_writer_options.transpose());
121+
113122
std::vector<uint64_t> indices = {1, 2, 4};
114123
ASSERT_TRUE(reader
115124
.ParallelReadRecordsWithIndices(

cpp/array_record_writer.cc

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ limitations under the License.
3131
#include "absl/base/thread_annotations.h"
3232
#include "absl/status/status.h"
3333
#include "absl/status/statusor.h"
34+
#include "absl/strings/match.h"
3435
#include "absl/strings/string_view.h"
3536
#include "absl/synchronization/mutex.h"
3637
#include "absl/types/span.h"
@@ -63,6 +64,8 @@ constexpr uint32_t kZstdDefaultWindowLog = 20;
6364
// Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16`
6465
constexpr uint64_t kMagic = 0x71930e704fdae05eULL;
6566

67+
constexpr char kArrayRecordDefaultCompression[] = "zstd:3";
68+
6669
using riegeli::Chunk;
6770
using riegeli::ChunkType;
6871
using riegeli::CompressorOptions;
@@ -109,11 +112,18 @@ absl::StatusOr<Chunk> ChunkFromSpan(CompressorOptions compression_options,
109112

110113
} // namespace
111114

115+
ArrayRecordWriterBase::Options::Options() {
116+
DCHECK_OK(
117+
this->compressor_options_.FromString(kArrayRecordDefaultCompression));
118+
this->compressor_options_.set_window_log(kZstdDefaultWindowLog);
119+
}
120+
112121
// static
113122
absl::StatusOr<ArrayRecordWriterBase::Options>
114123
ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
115124
ArrayRecordWriterBase::Options options;
116125
OptionsParser options_parser;
126+
options_parser.AddOption("default", ValueParser::FailIfAnySeen());
117127
// Group
118128
options_parser.AddOption(
119129
"group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_));
@@ -151,6 +161,15 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
151161
if (!options_parser.FromString(text)) {
152162
return options_parser.status();
153163
}
164+
// From our benchmarks we figured zstd:3 gives the best trade-off for both the
165+
// compression and decomopression speed.
166+
if (text == "default" ||
167+
(!absl::StrContains(compressor_text, "uncompressed") &&
168+
!absl::StrContains(compressor_text, "brotli") &&
169+
!absl::StrContains(compressor_text, "snappy") &&
170+
!absl::StrContains(compressor_text, "zstd"))) {
171+
absl::StrAppend(&compressor_text, ",", kArrayRecordDefaultCompression);
172+
}
154173
// max_parallelism is set after options_parser.FromString()
155174
if (max_parallelism > 0) {
156175
options.set_max_parallelism(max_parallelism);
@@ -167,13 +186,48 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
167186
return options;
168187
}
169188

189+
std::string ArrayRecordWriterBase::Options::ToString() const {
190+
std::string option;
191+
absl::StrAppend(&option, "group_size:", this->group_size_,
192+
",transpose:", this->transpose_ ? "true" : "false",
193+
",pad_to_block_boundary:",
194+
this->pad_to_block_boundary_ ? "true" : "false");
195+
if (this->transpose_) {
196+
absl::StrAppend(&option,
197+
",transpose_bucket_size:", this->transpose_bucket_size_);
198+
}
199+
switch (this->compressor_options().compression_type()) {
200+
case riegeli::CompressionType::kNone:
201+
absl::StrAppend(&option, ",uncompressed");
202+
break;
203+
case riegeli::CompressionType::kBrotli:
204+
absl::StrAppend(
205+
&option, ",brotli:", this->compressor_options().compression_level());
206+
break;
207+
case riegeli::CompressionType::kZstd:
208+
absl::StrAppend(&option,
209+
",zstd:", this->compressor_options().compression_level());
210+
break;
211+
case riegeli::CompressionType::kSnappy:
212+
absl::StrAppend(&option, ",snappy");
213+
break;
214+
}
215+
if (this->compressor_options().window_log().has_value()) {
216+
absl::StrAppend(&option, ",window_log:",
217+
this->compressor_options().window_log().value());
218+
}
219+
if (max_parallelism_.has_value()) {
220+
absl::StrAppend(&option, ",max_parallelism:", max_parallelism_.value());
221+
}
222+
return option;
223+
}
224+
170225
// Thread compatible callback guarded by SequencedChunkWriter's mutex.
171226
class ArrayRecordWriterBase::SubmitChunkCallback
172227
: public SequencedChunkWriterBase::SubmitChunkCallback {
173228
public:
174229
explicit SubmitChunkCallback(const ArrayRecordWriterBase::Options options)
175-
: compression_options_(options.compressor_options()),
176-
max_parallelism_(options.max_parallelism().value()) {
230+
: options_(options), max_parallelism_(options.max_parallelism().value()) {
177231
constexpr uint64_t kDefaultDecodedDataSize = (1 << 20);
178232
last_decoded_data_size_.store(kDefaultDecodedDataSize);
179233
}
@@ -200,7 +254,7 @@ class ArrayRecordWriterBase::SubmitChunkCallback
200254
void WriteFooterAndPostscript(SequencedChunkWriterBase* writer);
201255

202256
private:
203-
const CompressorOptions compression_options_;
257+
const Options options_;
204258

205259
absl::Mutex mu_;
206260
const int32_t max_parallelism_;
@@ -456,8 +510,11 @@ ArrayRecordWriterBase::SubmitChunkCallback::CreateFooterChunk() {
456510
footer_metadata.mutable_array_record_metadata()->set_num_chunks(
457511
array_footer_.size());
458512
footer_metadata.mutable_array_record_metadata()->set_num_records(num_records);
513+
footer_metadata.mutable_array_record_metadata()->set_writer_options(
514+
options_.ToString());
459515
// Perhaps we can compress the footer
460-
return ChunkFromSpan(compression_options_, absl::MakeConstSpan(array_footer_),
516+
return ChunkFromSpan(options_.compressor_options(),
517+
absl::MakeConstSpan(array_footer_),
461518
std::optional(footer_metadata));
462519
}
463520

cpp/array_record_writer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
8383
public:
8484
class Options {
8585
public:
86-
Options() {}
86+
Options();
8787

8888
// Parses options from text:
8989
// ```
@@ -268,6 +268,9 @@ class ArrayRecordWriterBase : public riegeli::Object {
268268
return metadata_;
269269
}
270270

271+
// Serialize the options to a string.
272+
std::string ToString() const;
273+
271274
private:
272275
int32_t group_size_ = kDefaultGroupSize;
273276
riegeli::CompressorOptions compressor_options_;

cpp/array_record_writer_test.cc

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,59 @@ INSTANTIATE_TEST_SUITE_P(
193193
testing::Bool(), testing::Bool(), testing::Bool()));
194194

195195
TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
196+
{
197+
auto option = ArrayRecordWriterBase::Options();
198+
EXPECT_EQ(option.group_size(),
199+
ArrayRecordWriterBase::Options::kDefaultGroupSize);
200+
EXPECT_FALSE(option.transpose());
201+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
202+
EXPECT_EQ(option.compressor_options().compression_type(),
203+
riegeli::CompressionType::kZstd);
204+
EXPECT_EQ(option.compressor_options().compression_level(), 3);
205+
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
206+
EXPECT_FALSE(option.pad_to_block_boundary());
207+
}
196208
{
197209
auto option = ArrayRecordWriterBase::Options::FromString("").value();
198210
EXPECT_EQ(option.group_size(),
199211
ArrayRecordWriterBase::Options::kDefaultGroupSize);
200212
EXPECT_FALSE(option.transpose());
201213
EXPECT_EQ(option.max_parallelism(), std::nullopt);
202214
EXPECT_EQ(option.compressor_options().compression_type(),
203-
riegeli::CompressionType::kBrotli);
215+
riegeli::CompressionType::kZstd);
216+
EXPECT_EQ(option.compressor_options().compression_level(), 3);
217+
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
218+
EXPECT_FALSE(option.pad_to_block_boundary());
219+
220+
EXPECT_EQ(option.ToString(),
221+
"group_size:65536,"
222+
"transpose:false,"
223+
"pad_to_block_boundary:false,"
224+
"zstd:3,"
225+
"window_log:20");
226+
EXPECT_TRUE(
227+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
228+
}
229+
{
230+
auto option = ArrayRecordWriterBase::Options::FromString("default").value();
231+
EXPECT_EQ(option.group_size(),
232+
ArrayRecordWriterBase::Options::kDefaultGroupSize);
233+
EXPECT_FALSE(option.transpose());
234+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
235+
EXPECT_EQ(option.compressor_options().compression_type(),
236+
riegeli::CompressionType::kZstd);
237+
EXPECT_EQ(option.compressor_options().compression_level(), 3);
238+
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
204239
EXPECT_FALSE(option.pad_to_block_boundary());
240+
241+
EXPECT_EQ(option.ToString(),
242+
"group_size:65536,"
243+
"transpose:false,"
244+
"pad_to_block_boundary:false,"
245+
"zstd:3,"
246+
"window_log:20");
247+
EXPECT_TRUE(
248+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
205249
}
206250
{
207251
auto option = ArrayRecordWriterBase::Options::FromString(
@@ -210,10 +254,42 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
210254
EXPECT_EQ(option.group_size(), 32);
211255
EXPECT_TRUE(option.transpose());
212256
EXPECT_EQ(option.max_parallelism(), std::nullopt);
257+
EXPECT_EQ(option.compressor_options().compression_type(),
258+
riegeli::CompressionType::kZstd);
259+
EXPECT_EQ(option.compressor_options().window_log(), 20);
260+
EXPECT_FALSE(option.pad_to_block_boundary());
261+
262+
EXPECT_EQ(option.ToString(),
263+
"group_size:32,"
264+
"transpose:true,"
265+
"pad_to_block_boundary:false,"
266+
"transpose_bucket_size:256,"
267+
"zstd:3,"
268+
"window_log:20");
269+
EXPECT_TRUE(
270+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
271+
}
272+
{
273+
auto option = ArrayRecordWriterBase::Options::FromString(
274+
"brotli:6,group_size:32,transpose,window_log:25")
275+
.value();
276+
EXPECT_EQ(option.group_size(), 32);
277+
EXPECT_TRUE(option.transpose());
278+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
213279
EXPECT_EQ(option.compressor_options().compression_type(),
214280
riegeli::CompressionType::kBrotli);
215-
EXPECT_EQ(option.compressor_options().brotli_window_log(), 20);
281+
EXPECT_EQ(option.compressor_options().window_log(), 25);
216282
EXPECT_FALSE(option.pad_to_block_boundary());
283+
284+
EXPECT_EQ(option.ToString(),
285+
"group_size:32,"
286+
"transpose:true,"
287+
"pad_to_block_boundary:false,"
288+
"transpose_bucket_size:256,"
289+
"brotli:6,"
290+
"window_log:25");
291+
EXPECT_TRUE(
292+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
217293
}
218294
{
219295
auto option = ArrayRecordWriterBase::Options::FromString(
@@ -224,9 +300,19 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
224300
EXPECT_EQ(option.max_parallelism(), std::nullopt);
225301
EXPECT_EQ(option.compressor_options().compression_type(),
226302
riegeli::CompressionType::kZstd);
227-
EXPECT_EQ(option.compressor_options().zstd_window_log(), 20);
303+
EXPECT_EQ(option.compressor_options().window_log(), 20);
228304
EXPECT_EQ(option.compressor_options().compression_level(), 5);
229305
EXPECT_FALSE(option.pad_to_block_boundary());
306+
307+
EXPECT_EQ(option.ToString(),
308+
"group_size:32,"
309+
"transpose:true,"
310+
"pad_to_block_boundary:false,"
311+
"transpose_bucket_size:256,"
312+
"zstd:5,"
313+
"window_log:20");
314+
EXPECT_TRUE(
315+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
230316
}
231317
{
232318
auto option = ArrayRecordWriterBase::Options::FromString(
@@ -239,6 +325,34 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
239325
EXPECT_EQ(option.compressor_options().compression_type(),
240326
riegeli::CompressionType::kNone);
241327
EXPECT_TRUE(option.pad_to_block_boundary());
328+
329+
EXPECT_EQ(option.ToString(),
330+
"group_size:65536,"
331+
"transpose:false,"
332+
"pad_to_block_boundary:true,"
333+
"uncompressed");
334+
EXPECT_TRUE(
335+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
336+
}
337+
{
338+
auto option = ArrayRecordWriterBase::Options::FromString(
339+
"snappy,pad_to_block_boundary:true")
340+
.value();
341+
EXPECT_EQ(option.group_size(),
342+
ArrayRecordWriterBase::Options::kDefaultGroupSize);
343+
EXPECT_FALSE(option.transpose());
344+
EXPECT_EQ(option.max_parallelism(), std::nullopt);
345+
EXPECT_EQ(option.compressor_options().compression_type(),
346+
riegeli::CompressionType::kSnappy);
347+
EXPECT_TRUE(option.pad_to_block_boundary());
348+
349+
EXPECT_EQ(option.ToString(),
350+
"group_size:65536,"
351+
"transpose:false,"
352+
"pad_to_block_boundary:true,"
353+
"snappy");
354+
EXPECT_TRUE(
355+
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
242356
}
243357
}
244358

cpp/layout.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ message RiegeliFooterMetadata {
8282
optional uint32 version = 1;
8383
optional uint64 num_chunks = 2;
8484
optional uint64 num_records = 3;
85+
// Writer options for debugging purposes.
86+
optional string writer_options = 4;
8587
}
8688

8789
oneof metadata {

0 commit comments

Comments
 (0)