diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index e6b0996cc3..8d63b8b3f2 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -322,14 +322,6 @@ private: std::atomic _dying{false}; gate _background_gate; - inline auto& get_sg_data(const scheduling_group& sg) { - return _scheduling_group_specific_data.per_scheduling_group_data[sg._id]; - } - - inline auto& get_sg_data(unsigned sg_id) { - return _scheduling_group_specific_data.per_scheduling_group_data[sg_id]; - } - private: static std::chrono::nanoseconds calculate_poll_time(); static void block_notifier(int); @@ -401,6 +393,7 @@ private: task_queue* pop_active_task_queue(sched_clock::time_point now); void insert_activating_task_queues(); void account_runtime(task_queue& tq, sched_clock::duration runtime); + void allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id); future<> rename_scheduling_group_specific_data(scheduling_group sg); future<> init_scheduling_group(scheduling_group sg, sstring name, sstring shortname, float shares); future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg); diff --git a/include/seastar/core/scheduling_specific.hh b/include/seastar/core/scheduling_specific.hh index 3b90175e4b..0a9c9d8b3d 100644 --- a/include/seastar/core/scheduling_specific.hh +++ b/include/seastar/core/scheduling_specific.hh @@ -37,49 +37,6 @@ namespace seastar { namespace internal { struct scheduling_group_specific_thread_local_data { - using val_ptr = std::unique_ptr; - using cfg_ptr = std::shared_ptr; - - struct specific_val { - val_ptr valp; - cfg_ptr cfg; - - specific_val() : valp(nullptr, &free), cfg(nullptr) {} - - specific_val(val_ptr&& valp_, const cfg_ptr& cfg_) : valp(std::move(valp_)), cfg(cfg_) { - if (valp && cfg->constructor) { - cfg->constructor(valp.get()); - } - } - - ~specific_val() { - if (valp && cfg->destructor) { - cfg->destructor(valp.get()); - } - } - - specific_val(const specific_val& other) = delete; - specific_val& operator=(const specific_val& other) = delete; - - specific_val(specific_val&& other) : valp(std::move(other.valp)), cfg(other.cfg) {} - - specific_val& operator=(specific_val&& other) { - if (this != &other) { - valp = std::move(other.valp); - cfg = std::move(other.cfg); - } - return *this; - } - - void* get() { return valp.get(); } - - void rename() { - if (valp && cfg->rename) { - cfg->rename(valp.get()); - } - } - }; - struct per_scheduling_group { bool queue_is_initialized = false; /** @@ -87,16 +44,10 @@ struct scheduling_group_specific_thread_local_data { * data. The pointer is not use as is but is cast to a reference * to the appropriate type that is actually pointed to. */ - std::vector specific_vals; - - void rename() { - for (auto& v : specific_vals) { - v.rename(); - } - } + std::vector specific_vals; }; std::array per_scheduling_group_data; - std::map scheduling_group_key_configs; + std::map scheduling_group_key_configs; }; #ifdef SEASTAR_BUILD_SHARED_LIBS @@ -127,12 +78,12 @@ template T* scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept { auto& data = internal::get_scheduling_group_specific_thread_local_data(); #ifdef SEASTAR_DEBUG - assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index); + assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index); #endif auto sg_id = internal::scheduling_group_index(sg); if (__builtin_expect(sg_id < data.per_scheduling_group_data.size() && data.per_scheduling_group_data[sg_id].queue_is_initialized, true)) { - return reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get()); + return reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]); } return nullptr; } @@ -172,9 +123,9 @@ T& scheduling_group_get_specific(scheduling_group_key key) noexcept { // return a reference to an element whose queue_is_initialized is // false. auto& data = internal::get_scheduling_group_specific_thread_local_data(); - assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index); + assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index); auto sg_id = internal::scheduling_group_index(current_scheduling_group()); - return *reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get()); + return *reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]); } /** @@ -204,7 +155,7 @@ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, auto wrapped_mapper = [key, mapper] (per_scheduling_group& psg) { auto id = internal::scheduling_group_key_id(key); return make_ready_future::return_type> - (mapper(*reinterpret_cast(psg.specific_vals[id].get()))); + (mapper(*reinterpret_cast(psg.specific_vals[id]))); }; return map_reduce( @@ -237,7 +188,7 @@ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, schedulin auto mapper = [key] (per_scheduling_group& psg) { auto id = internal::scheduling_group_key_id(key); - return make_ready_future(*reinterpret_cast(psg.specific_vals[id].get())); + return make_ready_future(*reinterpret_cast(psg.specific_vals[id])); }; return map_reduce( diff --git a/src/core/reactor.cc b/src/core/reactor.cc index b5593cf15d..69d9f23f37 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1066,12 +1066,23 @@ reactor::~reactor() { eraser(_expired_timers); eraser(_expired_lowres_timers); eraser(_expired_manual_timers); + auto& sg_data = _scheduling_group_specific_data; for (auto&& tq : _task_queues) { if (tq) { + auto& this_sg = sg_data.per_scheduling_group_data[tq->_id]; // The following line will preserve the convention that constructor and destructor functions // for the per sg values are called in the context of the containing scheduling group. *internal::current_scheduling_group_ptr() = scheduling_group(tq->_id); - get_sg_data(tq->_id).specific_vals.clear(); + for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { + void* val = this_sg.specific_vals[key_id]; + if (val) { + if (cfg.destructor) { + cfg.destructor(val); + } + free(val); + this_sg.specific_vals[key_id] = nullptr; + } + } } } } @@ -4884,25 +4895,32 @@ deallocate_scheduling_group_id(unsigned id) noexcept { s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed); } -static -internal::scheduling_group_specific_thread_local_data::specific_val -allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const std::shared_ptr& cfg) { - using val_ptr = internal::scheduling_group_specific_thread_local_data::val_ptr; - using specific_val = internal::scheduling_group_specific_thread_local_data::specific_val; - - val_ptr valp(aligned_alloc(cfg->alignment, cfg->allocation_size), &free); - if (!valp) { - throw std::runtime_error("memory allocation failed"); +void +reactor::allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id) { + auto& sg_data = _scheduling_group_specific_data; + auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + const auto& cfg = sg_data.scheduling_group_key_configs[key_id]; + this_sg.specific_vals.resize(std::max(this_sg.specific_vals.size(), key_id+1)); + this_sg.specific_vals[key_id] = aligned_alloc(cfg.alignment, cfg.allocation_size); + if (!this_sg.specific_vals[key_id]) { + std::abort(); + } + if (cfg.constructor) { + cfg.constructor(this_sg.specific_vals[key_id]); } - - return specific_val(std::move(valp), cfg); } future<> reactor::rename_scheduling_group_specific_data(scheduling_group sg) { return with_shared(_scheduling_group_keys_mutex, [this, sg] { return with_scheduling_group(sg, [this, sg] { - get_sg_data(sg).rename(); + auto& sg_data = _scheduling_group_specific_data; + auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { + if (cfg.rename) { + (cfg.rename)(this_sg.specific_vals[key_id]); + } + } }); }); } @@ -4910,25 +4928,16 @@ reactor::rename_scheduling_group_specific_data(scheduling_group sg) { future<> reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares) { return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares] { + auto& sg_data = _scheduling_group_specific_data; + auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + this_sg.queue_is_initialized = true; _task_queues.resize(std::max(_task_queues.size(), sg._id + 1)); _task_queues[sg._id] = std::make_unique(sg._id, name, shortname, shares); - using val_vector = decltype(_scheduling_group_specific_data.per_scheduling_group_data[sg._id].specific_vals); - - return with_scheduling_group(sg, [this, sg] () { - auto& sg_data = _scheduling_group_specific_data; - - val_vector vals; - vals.reserve(sg_data.scheduling_group_key_configs.size()); + return with_scheduling_group(sg, [this, sg, &sg_data] () { for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { - vals.resize(std::max(vals.size(), key_id+1)); - vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg); + allocate_scheduling_group_specific_data(sg, key_id); } - return vals; - }).then([this, sg] (val_vector vals) { - auto& this_sg = get_sg_data(sg); - std::swap(this_sg.specific_vals, vals); - this_sg.queue_is_initialized = true; }); }); } @@ -4936,10 +4945,10 @@ reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstri future<> reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) { return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] { + auto& sg_data = _scheduling_group_specific_data; auto key_id = internal::scheduling_group_key_id(key); - auto cfgp = std::make_shared(std::move(cfg)); - - return parallel_for_each(_task_queues, [this, key_id, cfgp] (std::unique_ptr& tq) { + sg_data.scheduling_group_key_configs[key_id] = cfg; + return parallel_for_each(_task_queues, [this, cfg, key_id] (std::unique_ptr& tq) { if (tq) { scheduling_group sg = scheduling_group(tq->_id); if (tq.get() == _at_destroy_tasks) { @@ -4947,21 +4956,14 @@ reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_grou auto curr = current_scheduling_group(); auto cleanup = defer([curr] () noexcept { *internal::current_scheduling_group_ptr() = curr; }); *internal::current_scheduling_group_ptr() = sg; - - auto& this_sg = get_sg_data(sg); - this_sg.specific_vals.resize(std::max(this_sg.specific_vals.size(), key_id+1)); - this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, std::move(cfgp)); + allocate_scheduling_group_specific_data(sg, key_id); } else { - return with_scheduling_group(sg, [this, key_id, sg, cfgp = std::move(cfgp)] () { - auto& this_sg = get_sg_data(sg); - this_sg.specific_vals.resize(std::max(this_sg.specific_vals.size(), key_id+1)); - this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, std::move(cfgp)); + return with_scheduling_group(sg, [this, key_id, sg] () { + allocate_scheduling_group_specific_data(sg, key_id); }); } } return make_ready_future(); - }).then([this, key_id, cfgp] () { - _scheduling_group_specific_data.scheduling_group_key_configs[key_id] = std::move(cfgp); }); }); } @@ -4972,9 +4974,22 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept { on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id)); } return with_scheduling_group(sg, [this, sg] () { - get_sg_data(sg).specific_vals.clear(); + auto& sg_data = _scheduling_group_specific_data; + auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { + void* val = this_sg.specific_vals[key_id]; + if (val) { + if (cfg.destructor) { + cfg.destructor(val); + } + free(val); + this_sg.specific_vals[key_id] = nullptr; + } + } }).then( [this, sg] () { - get_sg_data(sg).queue_is_initialized = false; + auto& sg_data = _scheduling_group_specific_data; + auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + this_sg.queue_is_initialized = false; _task_queues[sg._id].reset(); }); diff --git a/tests/unit/scheduling_group_test.cc b/tests/unit/scheduling_group_test.cc index 16c31bffe0..9b5f721726 100644 --- a/tests/unit/scheduling_group_test.cc +++ b/tests/unit/scheduling_group_test.cc @@ -375,75 +375,6 @@ SEASTAR_THREAD_TEST_CASE(sg_create_and_key_create_in_parallel) { } } -SEASTAR_THREAD_TEST_CASE(sg_key_constructor_exception_when_creating_new_key) { - using ivec = std::vector; - const int num_scheduling_groups = 4; - - scheduling_group_key_config key1_conf = make_scheduling_group_key_config(); - scheduling_group_key key1 = scheduling_group_key_create(key1_conf).get(); - - struct thrower { - thrower() { - throw std::runtime_error("constructor failed"); - } - ~thrower() { - // Shouldn't get here because the constructor shouldn't succeed - BOOST_ASSERT(false); - } - }; - scheduling_group_key_config thrower_conf = make_scheduling_group_key_config(); - BOOST_REQUIRE_THROW(scheduling_group_key_create(thrower_conf).get(), std::runtime_error); - - // check we can continue after the failure - - std::vector sgs; - for (int i = 0; i < num_scheduling_groups; i++) { - sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get()); - } - - const auto destroy_scheduling_groups = defer([&sgs] () noexcept { - for (scheduling_group sg : sgs) { - destroy_scheduling_group(sg).get(); - } - }); - - scheduling_group_key_config key2_conf = make_scheduling_group_key_config(); - scheduling_group_key key2 = scheduling_group_key_create(key2_conf).get(); - - smp::invoke_on_all([key1, key2, &sgs] () { - int factor = this_shard_id() + 1; - for (int i=0; i < num_scheduling_groups; i++) { - sgs[i].get_specific(key1) = (i + 1) * factor; - sgs[i].get_specific(key2).push_back((i + 1) * factor); - } - - for (int i=0; i < num_scheduling_groups; i++) { - BOOST_REQUIRE_EQUAL(sgs[i].get_specific(key1) = (i + 1) * factor, (i + 1) * factor); - BOOST_REQUIRE_EQUAL(sgs[i].get_specific(key2)[0], (i + 1) * factor); - } - - }).get(); - - smp::invoke_on_all([key1, key2] () { - return reduce_scheduling_group_specific(std::plus(), int(0), key1).then([] (int sum) { - int factor = this_shard_id() + 1; - int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2; - BOOST_REQUIRE_EQUAL(expected_sum, sum); - }). then([key2] { - auto ivec_to_int = [] (ivec& v) { - return v.size() ? v[0] : 0; - }; - - return map_reduce_scheduling_group_specific(ivec_to_int, std::plus(), int(0), key2).then([] (int sum) { - int factor = this_shard_id() + 1; - int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2; - BOOST_REQUIRE_EQUAL(expected_sum, sum); - }); - - }); - }).get(); -} - SEASTAR_THREAD_TEST_CASE(sg_create_with_destroy_tasks) { struct nada{};