Skip to content

Commit eea9f0d

Browse files
ArrayRecord Teamcopybara-github
authored andcommitted
Reuse the same reader across ReadChunk calls so that prefetched blocks are not discared.
In the sequential non-parallel path (ReadRecord->ReadAheadFromBuffer->ReadChunk) ReadChunk reads the whole chunk into memory. Most readers will round up the read to the blocksize and thereby prefetch some data from the next chunk. Currently ReadChunk recreates the reader each time and the previously prefetched data is lost and must be read again. Instead use get_backing_reader() directly each time. We can do this since ReadChunk is only called in single threaded context (Initialize, ReadRecord (parallelism disabled)) so there cannot be any concurrent access to the underlying reader. ReadTrace (before): ``` offset=64, size=1M <-- 1st chunk offset=1M, size=1M ... offset=8M, size=1M <-- block aligned read crosses chunk boundary offset=8.3M, size=0.7M <-- redundant read for 2nd chunk offset=9M, size=1M ... ``` ReadTrace (after): ``` offset=64, size=1M <-- 1st chunk offset=1M, size=1M ... offset=8M, size=1M <-- block aligned read crosses chunk boundary and is reused for the 2nd chunk offset=9M, size=1M ... ``` PiperOrigin-RevId: 577312179
1 parent b1403b5 commit eea9f0d

File tree

2 files changed

+15
-15
lines changed

2 files changed

+15
-15
lines changed

cpp/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ cc_library(
168168
"@com_google_riegeli//riegeli/base:options_parser",
169169
"@com_google_riegeli//riegeli/base:status",
170170
"@com_google_riegeli//riegeli/bytes:reader",
171+
"@com_google_riegeli//riegeli/bytes:wrapped_reader",
171172
"@com_google_riegeli//riegeli/chunk_encoding:chunk_decoder",
172173
"@com_google_riegeli//riegeli/records:chunk_reader",
173174
"@com_google_riegeli//riegeli/records:record_position",

cpp/array_record_reader.cc

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ limitations under the License.
4646
#include "riegeli/base/options_parser.h"
4747
#include "riegeli/base/status.h"
4848
#include "riegeli/bytes/reader.h"
49+
#include "riegeli/bytes/wrapped_reader.h"
4950
#include "riegeli/chunk_encoding/chunk_decoder.h"
5051
#include "riegeli/records/chunk_reader.h"
5152
#include "riegeli/records/record_position.h"
@@ -70,6 +71,7 @@ using riegeli::ChunkReader;
7071
using riegeli::OptionsParser;
7172
using riegeli::Reader;
7273
using riegeli::ValueParser;
74+
using riegeli::WrappedReader;
7375

7476
template <class T>
7577
T CeilOfRatio(T x, T d) {
@@ -159,20 +161,15 @@ ArrayRecordReaderBase& ArrayRecordReaderBase::operator=(
159161
return *this;
160162
}
161163

162-
// After the first access to the underlying `riegeli::Reader`, the lazily
163-
// evaluated variables for random access are all initialized. Therefore it's
164-
// safe to access the reader from multiple threads later on, even though the
165-
// methods wasn't const.
166-
ChunkDecoder ReadChunk(const ThreadCompatibleSharedPtr<Reader>& reader,
167-
size_t pos, size_t len) {
164+
ChunkDecoder ReadChunk(Reader* mutable_reader, size_t pos, size_t len) {
168165
ChunkDecoder decoder;
169-
if (!reader->ok()) {
170-
decoder.Fail(reader->status());
166+
if (!mutable_reader->ok()) {
167+
decoder.Fail(mutable_reader->status());
171168
return decoder;
172169
}
173-
Reader* mutable_reader =
174-
const_cast<Reader*>(reinterpret_cast<const Reader*>(reader.get()));
175-
MaskedReader masked_reader(mutable_reader->NewReader(pos), len);
170+
mutable_reader->Seek(pos);
171+
MaskedReader masked_reader(std::make_unique<WrappedReader<>>(mutable_reader),
172+
len);
176173
if (!masked_reader.ok()) {
177174
decoder.Fail(masked_reader.status());
178175
return decoder;
@@ -227,7 +224,7 @@ void ArrayRecordReaderBase::Initialize() {
227224
}
228225
RiegeliPostscript postscript;
229226
auto postscript_decoder =
230-
ReadChunk(reader, size - kRiegeliBlockSize, kRiegeliBlockSize);
227+
ReadChunk(mutable_reader, size - kRiegeliBlockSize, kRiegeliBlockSize);
231228
if (!postscript_decoder.ReadRecord(postscript)) {
232229
Fail(Annotate(postscript_decoder.status(),
233230
"Failed to read RiegeliPostscript"));
@@ -245,7 +242,7 @@ void ArrayRecordReaderBase::Initialize() {
245242
}
246243
state_->footer_offset = postscript.footer_offset();
247244
footer_decoder =
248-
ReadChunk(reader, postscript.footer_offset(),
245+
ReadChunk(mutable_reader, postscript.footer_offset(),
249246
size - kRiegeliBlockSize - postscript.footer_offset());
250247

251248
if (!footer_decoder.ReadRecord(footer_metadata)) {
@@ -670,11 +667,13 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
670667
uint64_t chunk_end = std::min(state_->chunk_offsets.size(),
671668
(buffer_idx + 1) * state_->chunk_group_size);
672669
const auto reader = get_backing_reader();
670+
Reader* mutable_reader =
671+
const_cast<Reader*>(reinterpret_cast<const Reader*>(reader.get()));
673672
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
674673
uint64_t chunk_offset = state_->chunk_offsets[chunk_idx];
675674
uint64_t chunk_end_offset = state_->ChunkEndOffset(chunk_idx);
676-
decoders.push_back(
677-
ReadChunk(reader, chunk_offset, chunk_end_offset - chunk_offset));
675+
decoders.push_back(ReadChunk(mutable_reader, chunk_offset,
676+
chunk_end_offset - chunk_offset));
678677
}
679678
state_->buffer_idx = buffer_idx;
680679
state_->current_decoders = std::move(decoders);

0 commit comments

Comments
 (0)