Skip to content

Commit

Permalink
Merge 'Sanitize sstables::directory_semaphore usage' from Pavel Emely…
Browse files Browse the repository at this point in the history
…anov

The semaphore in question is used to limit parallelism of manipulations with table's sstables. It's currently used in two places -- sstable_directory (mainly on boot) and by table::take_snapshot() to take snapshot. For the latter, there's also a database -> sharded<directory_semaphore> reference.

This PR sanitizes the semaphore usage. The results are
- directory_semaphore no longer needs to friend several classes that mess with its internals
- database no longer references directory_semaphore

Closes scylladb#18281

* github.com:scylladb/scylladb:
  database: Keep local directory_semaphore to initialize sstables managers
  database: Don't reference directory_semaphore
  table: Use directory semaphore from sstables manager
  table: Indentation fix after previous patch
  table: Use directory_semaphore for rate-limited snapshot taking
  sstables: Move directory_semaphore::parallel_for_each() to header
  sstables: Move parallel_for_each_restricted to directory_semaphore
  table: Use smp::all_cpus() to iterate over all CPUs locally
  • Loading branch information
denesb committed Apr 23, 2024
2 parents ab4de1f + ba58b71 commit 5a1e3b2
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 37 deletions.
7 changes: 3 additions & 4 deletions replica/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class db_user_types_storage : public data_dictionary::dummy_user_types_storage {
};

database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier)
compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sstables::directory_semaphore& sst_dir_sem, utils::cross_shard_barrier barrier)
: _stats(make_lw_shared<db_stats>())
, _user_types(std::make_shared<db_user_types_storage>(*this))
, _cl_stats(std::make_unique<cell_locker_stats>())
Expand Down Expand Up @@ -371,15 +371,14 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
_cfg.compaction_rows_count_warning_threshold,
_cfg.compaction_collection_elements_count_warning_threshold))
, _nop_large_data_handler(std::make_unique<db::nop_large_data_handler>())
, _user_sstables_manager(std::make_unique<sstables::sstables_manager>("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }, &sstm))
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }))
, _user_sstables_manager(std::make_unique<sstables::sstables_manager>("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, &sstm))
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }))
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>())
, _mnotifier(mn)
, _feat(feat)
, _shared_token_metadata(stm)
, _wasm(wasm)
, _sst_dir_semaphore(sst_dir_sem)
, _stop_barrier(std::move(barrier))
, _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); })
, _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer()))
Expand Down
9 changes: 1 addition & 8 deletions replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -1511,8 +1511,6 @@ private:
const locator::shared_token_metadata& _shared_token_metadata;
wasm::manager& _wasm;

sharded<sstables::directory_semaphore>& _sst_dir_semaphore;

utils::cross_shard_barrier _stop_barrier;

db::rate_limiter _rate_limiter;
Expand Down Expand Up @@ -1601,7 +1599,7 @@ public:
future<> parse_system_tables(distributed<service::storage_proxy>&, sharded<db::system_keyspace>&);

database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sstables::directory_semaphore& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
database(database&&) = delete;
~database();

Expand Down Expand Up @@ -1887,11 +1885,6 @@ public:
future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);

bool is_internal_query() const;

sharded<sstables::directory_semaphore>& get_sharded_sst_dir_semaphore() {
return _sst_dir_semaphore;
}

bool is_user_semaphore(const reader_concurrency_semaphore& semaphore) const;
};

Expand Down
10 changes: 4 additions & 6 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
std::vector<table::snapshot_file_set> file_sets;
file_sets.reserve(smp::count);

co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> {
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
return table_shards->take_snapshot(sharded_db.local(), jsondir);
}));
Expand All @@ -2305,12 +2305,10 @@ future<table::snapshot_file_set> table::take_snapshot(database& db, sstring json
auto table_names = std::make_unique<std::unordered_set<sstring>>();

co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await max_concurrent_for_each(tables, db.get_sharded_sst_dir_semaphore().local()._concurrency, [&db, &jsondir, &table_names] (sstables::shared_sstable sstable) {
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return with_semaphore(db.get_sharded_sst_dir_semaphore().local()._sem, 1, [&jsondir, sstable] {
return io_check([sstable, &dir = jsondir] {
return sstable->snapshot(dir);
});
return io_check([sstable, &dir = jsondir] {
return sstable->snapshot(dir);
});
});
co_await io_check(sync_directory, jsondir);
Expand Down
18 changes: 4 additions & 14 deletions sstables/sstable_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ future<> sstable_directory::filesystem_components_lister::process(sstable_direct
// _descriptors is everything with a TOC. So after we remove this, what's left is
// SSTables for which a TOC was not found.
auto descriptors = std::move(_state->descriptors);
co_await directory.parallel_for_each_restricted(descriptors, [this, flags, &directory] (std::pair<const generation_type, sstables::entry_descriptor>& t) {
co_await directory._manager.dir_semaphore().parallel_for_each(descriptors, [this, flags, &directory] (std::pair<const generation_type, sstables::entry_descriptor>& t) {
auto& desc = std::get<1>(t);
_state->generations_found.erase(desc.generation);
// This will try to pre-load this file and throw an exception if it is invalid
Expand Down Expand Up @@ -447,7 +447,7 @@ future<shared_sstable> sstable_directory::load_foreign_sstable(foreign_sstable_o

future<>
sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) {
co_await parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) {
co_await _manager.dir_semaphore().parallel_for_each(info_vec, [this] (const sstables::entry_descriptor& info) {
return load_sstable(info).then([this] (auto sst) {
_unshared_local_sstables.push_back(sst);
return make_ready_future<>();
Expand Down Expand Up @@ -521,13 +521,13 @@ sstable_directory::remove_unshared_sstables(std::vector<sstables::shared_sstable
future<>
sstable_directory::do_for_each_sstable(std::function<future<>(sstables::shared_sstable)> func) {
auto sstables = std::move(_unshared_local_sstables);
co_await parallel_for_each_restricted(sstables, std::move(func));
co_await _manager.dir_semaphore().parallel_for_each(sstables, std::move(func));
}

future<>
sstable_directory::filter_sstables(std::function<future<bool>(sstables::shared_sstable)> func) {
std::vector<sstables::shared_sstable> filtered;
co_await parallel_for_each_restricted(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> {
co_await _manager.dir_semaphore().parallel_for_each(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> {
auto keep = co_await func(sst);
if (keep) {
filtered.emplace_back(sst);
Expand All @@ -536,16 +536,6 @@ sstable_directory::filter_sstables(std::function<future<bool>(sstables::shared_s
_unshared_local_sstables = std::move(filtered);
}

template <std::ranges::range Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::ranges::range_value_t<Container>&>
future<>
sstable_directory::parallel_for_each_restricted(Container& c, Func func) {
co_await max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [&] (auto& el) -> future<>{
auto units = co_await get_units(_manager.dir_semaphore()._sem, 1);
co_await func(el);
});
}

void
sstable_directory::store_phaser(utils::phased_barrier::operation op) {
_operation_barrier.emplace(std::move(op));
Expand Down
13 changes: 8 additions & 5 deletions sstables/sstable_directory.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ public:
{
}

friend class sstable_directory;
friend class ::replica::table; // FIXME table snapshots should switch to sstable_directory
template <std::ranges::range Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::ranges::range_value_t<Container>&>
future<> parallel_for_each(Container& c, Func func) {
co_await max_concurrent_for_each(c, _concurrency, [&] (auto& el) -> future<>{
auto units = co_await get_units(_sem, 1);
co_await func(el);
});
}
};

// Handles a directory containing SSTables. It could be an auxiliary directory (like upload),
Expand Down Expand Up @@ -174,9 +180,6 @@ private:
future<sstables::shared_sstable> load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg = {}) const;
future<sstables::shared_sstable> load_sstable(sstables::entry_descriptor desc, process_flags flags) const;

template <std::ranges::range Container, typename Func>
requires std::is_invocable_r_v<future<>, Func, typename std::ranges::range_value_t<Container>&>
future<> parallel_for_each_restricted(Container& C, Func func);
future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec);

// Sort the sstable according to owner
Expand Down

0 comments on commit 5a1e3b2

Please sign in to comment.