Skip to content

Commit 5666597

Browse files
Daniel Munozfacebook-github-bot
authored andcommitted
Syncrhonize access to optionalSectionsCache_
Summary: To make the class thread safe. Reviewed By: helfman Differential Revision: D56763840
1 parent 3acdb4b commit 5666597

File tree

4 files changed

+63
-41
lines changed

4 files changed

+63
-41
lines changed

dwio/nimble/common/tests/TestUtils.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "dwio/nimble/common/Types.h"
2424
#include "dwio/nimble/common/Vector.h"
2525
#include "folly/Random.h"
26+
#include "folly/Synchronized.h"
2627
#include "velox/common/file/File.h"
2728
#include "velox/common/memory/Memory.h"
2829

@@ -291,12 +292,12 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
291292

292293
std::string_view pread(uint64_t offset, uint64_t length, void* buf)
293294
const final {
294-
chunks_.push_back({offset, length});
295+
chunks_.wlock()->push_back({offset, length});
295296
return file_.pread(offset, length, buf);
296297
}
297298

298299
std::string pread(uint64_t offset, uint64_t length) const final {
299-
chunks_.push_back({offset, length});
300+
chunks_.wlock()->push_back({offset, length});
300301
return file_.pread(offset, length);
301302
}
302303

@@ -314,7 +315,7 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
314315
const auto& region = regions[i];
315316
auto& output = iobufs[i];
316317
if (shouldProduceChainedBuffers_) {
317-
chunks_.push_back({region.offset, region.length});
318+
chunks_.wlock()->push_back({region.offset, region.length});
318319
uint64_t splitPoint = region.length / 2;
319320
output = folly::IOBuf(folly::IOBuf::CREATE, splitPoint);
320321
file_.pread(region.offset, splitPoint, output.writableData());
@@ -350,12 +351,12 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
350351
return file_.shouldCoalesce();
351352
}
352353

353-
const std::vector<Chunk>& chunks() {
354-
return chunks_;
354+
std::vector<Chunk> chunks() {
355+
return *chunks_.rlock();
355356
}
356357

357358
void resetChunks() {
358-
chunks_.clear();
359+
chunks_.wlock()->clear();
359360
}
360361

361362
std::string getName() const override {
@@ -369,7 +370,7 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
369370
private:
370371
velox::InMemoryReadFile file_;
371372
bool shouldProduceChainedBuffers_;
372-
mutable std::vector<Chunk> chunks_;
373+
mutable folly::Synchronized<std::vector<Chunk>> chunks_;
373374
};
374375

375376
} // namespace facebook::nimble::testing

dwio/nimble/tablet/TabletReader.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,9 @@ TabletReader::TabletReader(
261261
/* stripeIndex */ 0,
262262
std::make_unique<MetadataBuffer>(memoryPool, stripeGroup));
263263
initStripes();
264+
auto optionalSectionsCacheLock = optionalSectionsCache_.wlock();
264265
for (auto& pair : optionalSections) {
265-
optionalSectionsCache_.insert(
266+
optionalSectionsCacheLock->insert(
266267
{pair.first,
267268
std::make_unique<MetadataBuffer>(memoryPool, pair.second)});
268269
}
@@ -407,7 +408,7 @@ TabletReader::TabletReader(
407408
sectionOffset - offset,
408409
sectionSize,
409410
sectionCompressionType);
410-
optionalSectionsCache_.insert({preload, std::move(metadata)});
411+
optionalSectionsCache_.wlock()->insert({preload, std::move(metadata)});
411412
}
412413
}
413414
if (!mustRead.empty()) {
@@ -421,7 +422,7 @@ TabletReader::TabletReader(
421422
const std::string preload{mustRead[i].label};
422423
auto metadata = std::make_unique<MetadataBuffer>(
423424
memoryPool_, iobuf, std::get<2>(optionalSections_[preload]));
424-
optionalSectionsCache_.insert({preload, std::move(metadata)});
425+
optionalSectionsCache_.wlock()->insert({preload, std::move(metadata)});
425426
}
426427
}
427428
}
@@ -609,14 +610,17 @@ std::optional<Section> TabletReader::loadOptionalSection(
609610
const std::string& name,
610611
bool keepCache) const {
611612
NIMBLE_CHECK(!name.empty(), "Optional section name cannot be empty.");
612-
auto itCache = optionalSectionsCache_.find(name);
613-
if (itCache != optionalSectionsCache_.end()) {
614-
if (keepCache) {
615-
return Section{MetadataBuffer{*itCache->second}};
616-
} else {
617-
auto metadata = std::move(itCache->second);
618-
optionalSectionsCache_.erase(itCache);
619-
return Section{std::move(*metadata)};
613+
{
614+
auto optionalSectionsCache = optionalSectionsCache_.wlock();
615+
auto itCache = optionalSectionsCache->find(name);
616+
if (itCache != optionalSectionsCache->end()) {
617+
if (keepCache) {
618+
return Section{MetadataBuffer{*itCache->second}};
619+
} else {
620+
auto metadata = std::move(itCache->second);
621+
optionalSectionsCache->erase(itCache);
622+
return Section{std::move(*metadata)};
623+
}
620624
}
621625
}
622626

dwio/nimble/tablet/TabletReader.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "dwio/nimble/common/Checksum.h"
2020
#include "dwio/nimble/common/Vector.h"
2121
#include "folly/Range.h"
22+
#include "folly/Synchronized.h"
2223
#include "folly/io/IOBuf.h"
2324
#include "velox/common/file/File.h"
2425
#include "velox/common/memory/Memory.h"
@@ -289,7 +290,8 @@ class TabletReader {
289290
std::string,
290291
std::tuple<uint64_t, uint32_t, CompressionType>>
291292
optionalSections_;
292-
mutable std::unordered_map<std::string, std::unique_ptr<MetadataBuffer>>
293+
mutable folly::Synchronized<
294+
std::unordered_map<std::string, std::unique_ptr<MetadataBuffer>>>
293295
optionalSectionsCache_;
294296

295297
friend class TabletHelper;

dwio/nimble/tablet/tests/TabletTests.cpp

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
#include <folly/executors/CPUThreadPoolExecutor.h>
1716
#include <gtest/gtest.h>
1817
#include <algorithm>
1918
#include <iterator>
@@ -28,9 +27,11 @@
2827
#include "dwio/nimble/tablet/TabletWriter.h"
2928
#include "folly/FileUtil.h"
3029
#include "folly/Random.h"
30+
#include "folly/executors/CPUThreadPoolExecutor.h"
3131
#include "folly/experimental/coro/Generator.h"
3232
#include "velox/common/file/File.h"
3333
#include "velox/common/memory/Memory.h"
34+
#include "velox/dwio/common/ExecutorBarrier.h"
3435

3536
using namespace ::facebook;
3637

@@ -513,14 +514,14 @@ TEST(TabletTests, OptionalSections) {
513514
const std::string& content = random;
514515
tabletWriter.writeOptionalSection("section1", content);
515516
}
517+
std::string zeroes;
516518
{
517-
std::string content;
518-
content.resize(randomSize);
519-
for (auto i = 0; i < content.size(); ++i) {
520-
content[i] = '\0';
519+
zeroes.resize(randomSize);
520+
for (auto i = 0; i < zeroes.size(); ++i) {
521+
zeroes[i] = '\0';
521522
}
522523

523-
tabletWriter.writeOptionalSection("section2", content);
524+
tabletWriter.writeOptionalSection("section2", zeroes);
524525
}
525526
{
526527
std::string content;
@@ -529,30 +530,44 @@ TEST(TabletTests, OptionalSections) {
529530

530531
tabletWriter.close();
531532

533+
folly::CPUThreadPoolExecutor executor{5};
534+
facebook::velox::dwio::common::ExecutorBarrier barrier{executor};
535+
532536
for (auto useChaniedBuffers : {false, true}) {
533537
nimble::testing::InMemoryTrackableReadFile readFile(
534538
file, useChaniedBuffers);
535539
nimble::TabletReader tablet{*pool, &readFile};
536540

537-
auto section = tablet.loadOptionalSection("section1");
538-
ASSERT_TRUE(section.has_value());
539-
ASSERT_EQ(random, section->content());
541+
auto check1 = [&]() {
542+
auto section = tablet.loadOptionalSection("section1");
543+
ASSERT_TRUE(section.has_value());
544+
ASSERT_EQ(random, section->content());
545+
};
540546

541-
std::string expectedContent;
542-
expectedContent.resize(randomSize);
543-
for (auto i = 0; i < expectedContent.size(); ++i) {
544-
expectedContent[i] = '\0';
545-
}
546-
section = tablet.loadOptionalSection("section2");
547-
ASSERT_TRUE(section.has_value());
548-
ASSERT_EQ(expectedContent, section->content());
547+
auto check2 = [&]() {
548+
auto section = tablet.loadOptionalSection("section2");
549+
ASSERT_TRUE(section.has_value());
550+
ASSERT_EQ(zeroes, section->content());
551+
};
549552

550-
section = tablet.loadOptionalSection("section3");
551-
ASSERT_TRUE(section.has_value());
552-
ASSERT_EQ(std::string(), section->content());
553+
auto check3 = [&, empty = std::string()]() {
554+
auto section = tablet.loadOptionalSection("section3");
555+
ASSERT_TRUE(section.has_value());
556+
ASSERT_EQ(empty, section->content());
557+
};
553558

554-
section = tablet.loadOptionalSection("section4");
555-
ASSERT_FALSE(section.has_value());
559+
auto check4 = [&]() {
560+
auto section = tablet.loadOptionalSection("section4");
561+
ASSERT_FALSE(section.has_value());
562+
};
563+
564+
for (int i = 0; i < 10; ++i) {
565+
barrier.add(check1);
566+
barrier.add(check2);
567+
barrier.add(check3);
568+
barrier.add(check4);
569+
}
570+
barrier.waitAll();
556571
}
557572
}
558573

0 commit comments

Comments
 (0)