From 0d2178202ded16543de5e7ab762f271bba39720d Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 15:36:51 +0300 Subject: [PATCH 1/8] table: Use smp::all_cpus() to iterate over all CPUs locally Currently it uses irange(0, smp::count0), but seastar provides convenient helper call for the very same range object. Signed-off-by: Pavel Emelyanov --- replica/table.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replica/table.cc b/replica/table.cc index e26831a7937a..8daae7b9c99f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2286,7 +2286,7 @@ future<> table::snapshot_on_all_shards(sharded& sharded_db, const glob std::vector file_sets; file_sets.reserve(smp::count); - co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&] (unsigned shard) -> future<> { + co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> { file_sets.emplace_back(co_await smp::submit_to(shard, [&] { return table_shards->take_snapshot(sharded_db.local(), jsondir); })); From ad1a9d4c111a02c1ca096126f658d55f5375da3b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 15:46:25 +0300 Subject: [PATCH 2/8] sstables: Move parallel_for_each_restricted to directory_semaphore In order not to make sstable_directory mess with private members of this class. Next patch will also make use of this new method. Signed-off-by: Pavel Emelyanov --- sstables/sstable_directory.cc | 14 +++++++------- sstables/sstable_directory.hh | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index f60761a62e44..efc458aff7b6 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -350,7 +350,7 @@ future<> sstable_directory::filesystem_components_lister::process(sstable_direct // _descriptors is everything with a TOC. So after we remove this, what's left is // SSTables for which a TOC was not found. auto descriptors = std::move(_state->descriptors); - co_await directory.parallel_for_each_restricted(descriptors, [this, flags, &directory] (std::pair& t) { + co_await directory._manager.dir_semaphore().parallel_for_each(descriptors, [this, flags, &directory] (std::pair& t) { auto& desc = std::get<1>(t); _state->generations_found.erase(desc.generation); // This will try to pre-load this file and throw an exception if it is invalid @@ -445,7 +445,7 @@ future sstable_directory::load_foreign_sstable(foreign_sstable_o future<> sstable_directory::load_foreign_sstables(sstable_entry_descriptor_vector info_vec) { - co_await parallel_for_each_restricted(info_vec, [this] (const sstables::entry_descriptor& info) { + co_await _manager.dir_semaphore().parallel_for_each(info_vec, [this] (const sstables::entry_descriptor& info) { return load_sstable(info).then([this] (auto sst) { _unshared_local_sstables.push_back(sst); return make_ready_future<>(); @@ -519,13 +519,13 @@ sstable_directory::remove_unshared_sstables(std::vector sstable_directory::do_for_each_sstable(std::function(sstables::shared_sstable)> func) { auto sstables = std::move(_unshared_local_sstables); - co_await parallel_for_each_restricted(sstables, std::move(func)); + co_await _manager.dir_semaphore().parallel_for_each(sstables, std::move(func)); } future<> sstable_directory::filter_sstables(std::function(sstables::shared_sstable)> func) { std::vector filtered; - co_await parallel_for_each_restricted(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> { + co_await _manager.dir_semaphore().parallel_for_each(_unshared_local_sstables, [func = std::move(func), &filtered] (sstables::shared_sstable sst) -> future<> { auto keep = co_await func(sst); if (keep) { filtered.emplace_back(sst); @@ -537,9 +537,9 @@ sstable_directory::filter_sstables(std::function(sstables::shared_s template requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> future<> -sstable_directory::parallel_for_each_restricted(Container& c, Func func) { - co_await max_concurrent_for_each(c, _manager.dir_semaphore()._concurrency, [&] (auto& el) -> future<>{ - auto units = co_await get_units(_manager.dir_semaphore()._sem, 1); +directory_semaphore::parallel_for_each(Container& c, Func func) { + co_await max_concurrent_for_each(c, _concurrency, [&] (auto& el) -> future<>{ + auto units = co_await get_units(_sem, 1); co_await func(el); }); } diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 99aed7de219b..2ebb0919c29f 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -45,8 +45,11 @@ public: { } - friend class sstable_directory; friend class ::replica::table; // FIXME table snapshots should switch to sstable_directory + + template + requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> + future<> parallel_for_each(Container& C, Func func); }; // Handles a directory containing SSTables. It could be an auxiliary directory (like upload), @@ -174,9 +177,6 @@ private: future load_sstable(sstables::entry_descriptor desc, sstables::sstable_open_config cfg = {}) const; future load_sstable(sstables::entry_descriptor desc, process_flags flags) const; - template - requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> - future<> parallel_for_each_restricted(Container& C, Func func); future<> load_foreign_sstables(sstable_entry_descriptor_vector info_vec); // Sort the sstable according to owner From 6514c67fae9f091e95f42b87c0f62e18adfcd657 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 16:01:29 +0300 Subject: [PATCH 3/8] sstables: Move directory_semaphore::parallel_for_each() to header It's a template and in order to use it in other .cc files it's more convenient to move it into a header file Signed-off-by: Pavel Emelyanov --- sstables/sstable_directory.cc | 10 ---------- sstables/sstable_directory.hh | 7 ++++++- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index efc458aff7b6..3ee7db043cd3 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -534,16 +534,6 @@ sstable_directory::filter_sstables(std::function(sstables::shared_s _unshared_local_sstables = std::move(filtered); } -template -requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> -future<> -directory_semaphore::parallel_for_each(Container& c, Func func) { - co_await max_concurrent_for_each(c, _concurrency, [&] (auto& el) -> future<>{ - auto units = co_await get_units(_sem, 1); - co_await func(el); - }); -} - void sstable_directory::store_phaser(utils::phased_barrier::operation op) { _operation_barrier.emplace(std::move(op)); diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 2ebb0919c29f..8d62cc82ab3f 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -49,7 +49,12 @@ public: template requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> - future<> parallel_for_each(Container& C, Func func); + future<> parallel_for_each(Container& c, Func func) { + co_await max_concurrent_for_each(c, _concurrency, [&] (auto& el) -> future<>{ + auto units = co_await get_units(_sem, 1); + co_await func(el); + }); + } }; // Handles a directory containing SSTables. It could be an auxiliary directory (like upload), From 2fced3c55797ee7c9e9eab2f4dcd50f5fad27d02 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 15:50:45 +0300 Subject: [PATCH 4/8] table: Use directory_semaphore for rate-limited snapshot taking The table::take_snapshot() limits its parallelizm with the help of direcoty semaphore already, but implements it "by hand". There's already parallel_for_each() method on the dir.sem. class that does exactly that. Signed-off-by: Pavel Emelyanov --- replica/table.cc | 4 +--- sstables/sstable_directory.hh | 2 -- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/replica/table.cc b/replica/table.cc index 8daae7b9c99f..91346c74f0aa 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2305,13 +2305,11 @@ future table::take_snapshot(database& db, sstring json auto table_names = std::make_unique>(); co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); }); - co_await max_concurrent_for_each(tables, db.get_sharded_sst_dir_semaphore().local()._concurrency, [&db, &jsondir, &table_names] (sstables::shared_sstable sstable) { + co_await db.get_sharded_sst_dir_semaphore().local().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) { table_names->insert(sstable->component_basename(sstables::component_type::Data)); - return with_semaphore(db.get_sharded_sst_dir_semaphore().local()._sem, 1, [&jsondir, sstable] { return io_check([sstable, &dir = jsondir] { return sstable->snapshot(dir); }); - }); }); co_await io_check(sync_directory, jsondir); co_return make_foreign(std::move(table_names)); diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 8d62cc82ab3f..2b7490a33b8a 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -45,8 +45,6 @@ public: { } - friend class ::replica::table; // FIXME table snapshots should switch to sstable_directory - template requires std::is_invocable_r_v, Func, typename std::ranges::range_value_t&> future<> parallel_for_each(Container& c, Func func) { From 7e7dd2649b2153a3c1a3a1f4ce6614d1593e4e0f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 15:57:05 +0300 Subject: [PATCH 5/8] table: Indentation fix after previous patch Signed-off-by: Pavel Emelyanov --- replica/table.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/replica/table.cc b/replica/table.cc index 91346c74f0aa..100841628f50 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2307,9 +2307,9 @@ future table::take_snapshot(database& db, sstring json co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); }); co_await db.get_sharded_sst_dir_semaphore().local().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) { table_names->insert(sstable->component_basename(sstables::component_type::Data)); - return io_check([sstable, &dir = jsondir] { - return sstable->snapshot(dir); - }); + return io_check([sstable, &dir = jsondir] { + return sstable->snapshot(dir); + }); }); co_await io_check(sync_directory, jsondir); co_return make_foreign(std::move(table_names)); From be5bc38cde1279ea68ed46426e6e4d97868627bc Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 15:56:58 +0300 Subject: [PATCH 6/8] table: Use directory semaphore from sstables manager It's natural for a table to itarate over its sstables, get the semaphore from the manager of sstables, not from database. Signed-off-by: Pavel Emelyanov --- replica/table.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replica/table.cc b/replica/table.cc index 100841628f50..c611b52dcd42 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2305,7 +2305,7 @@ future table::take_snapshot(database& db, sstring json auto table_names = std::make_unique>(); co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); }); - co_await db.get_sharded_sst_dir_semaphore().local().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) { + co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) { table_names->insert(sstable->component_basename(sstables::component_type::Data)); return io_check([sstable, &dir = jsondir] { return sstable->snapshot(dir); From 53909da39065af821ee97e08fb2abf3630e23ae6 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 15:59:09 +0300 Subject: [PATCH 7/8] database: Don't reference directory_semaphore It was only used by table taking snapshot code. Now it uses sstables manager's reference and database can stop carrying it around. Signed-off-by: Pavel Emelyanov --- replica/database.cc | 1 - replica/database.hh | 7 ------- 2 files changed, 8 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index bf8ade4377b6..3dfc90b80e0e 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -378,7 +378,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _feat(feat) , _shared_token_metadata(stm) , _wasm(wasm) - , _sst_dir_semaphore(sst_dir_sem) , _stop_barrier(std::move(barrier)) , _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); }) , _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer())) diff --git a/replica/database.hh b/replica/database.hh index 6840f8680d85..4ffa8f76db69 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1511,8 +1511,6 @@ private: const locator::shared_token_metadata& _shared_token_metadata; wasm::manager& _wasm; - sharded& _sst_dir_semaphore; - utils::cross_shard_barrier _stop_barrier; db::rate_limiter _rate_limiter; @@ -1887,11 +1885,6 @@ public: future obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); bool is_internal_query() const; - - sharded& get_sharded_sst_dir_semaphore() { - return _sst_dir_semaphore; - } - bool is_user_semaphore(const reader_concurrency_semaphore& semaphore) const; }; From ba58b71eea7832676bc4f97e14b19afcd4ed48f6 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 17 Apr 2024 16:00:50 +0300 Subject: [PATCH 8/8] database: Keep local directory_semaphore to initialize sstables managers Now database is constructed with sharded, but it no longer needs sharded, local is enough. Signed-off-by: Pavel Emelyanov --- replica/database.cc | 6 +++--- replica/database.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 3dfc90b80e0e..99ae6b44b0cf 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -310,7 +310,7 @@ class db_user_types_storage : public data_dictionary::dummy_user_types_storage { }; database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, - compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sharded& sst_dir_sem, utils::cross_shard_barrier barrier) + compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sstables::directory_semaphore& sst_dir_sem, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) , _user_types(std::make_shared(*this)) , _cl_stats(std::make_unique()) @@ -370,8 +370,8 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat _cfg.compaction_rows_count_warning_threshold, _cfg.compaction_collection_elements_count_warning_threshold)) , _nop_large_data_handler(std::make_unique()) - , _user_sstables_manager(std::make_unique("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }, &sstm)) - , _system_sstables_manager(std::make_unique("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); })) + , _user_sstables_manager(std::make_unique("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, &sstm)) + , _system_sstables_manager(std::make_unique("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); })) , _result_memory_limiter(dbcfg.available_memory / 10) , _data_listeners(std::make_unique()) , _mnotifier(mn) diff --git a/replica/database.hh b/replica/database.hh index 4ffa8f76db69..8b0f28943873 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1599,7 +1599,7 @@ public: future<> parse_system_tables(distributed&, sharded&); database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, - compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sharded& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */); + compaction_manager& cm, sstables::storage_manager& sstm, wasm::manager& wasm, sstables::directory_semaphore& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */); database(database&&) = delete; ~database();