diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index e4e438676f..e6b0996cc3 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -322,6 +322,14 @@ private: std::atomic _dying{false}; gate _background_gate; + inline auto& get_sg_data(const scheduling_group& sg) { + return _scheduling_group_specific_data.per_scheduling_group_data[sg._id]; + } + + inline auto& get_sg_data(unsigned sg_id) { + return _scheduling_group_specific_data.per_scheduling_group_data[sg_id]; + } + private: static std::chrono::nanoseconds calculate_poll_time(); static void block_notifier(int); diff --git a/include/seastar/core/scheduling_specific.hh b/include/seastar/core/scheduling_specific.hh index 0a9c9d8b3d..bdb51110d7 100644 --- a/include/seastar/core/scheduling_specific.hh +++ b/include/seastar/core/scheduling_specific.hh @@ -37,6 +37,49 @@ namespace seastar { namespace internal { struct scheduling_group_specific_thread_local_data { + using val_ptr = std::unique_ptr; + using cfg_ptr = lw_shared_ptr; + + struct specific_val { + val_ptr valp; + cfg_ptr cfg; + + specific_val() : valp(nullptr, &free), cfg(nullptr) {} + + specific_val(val_ptr&& valp_, const cfg_ptr& cfg_) : valp(std::move(valp_)), cfg(cfg_) { + if (valp && cfg->constructor) { + cfg->constructor(valp.get()); + } + } + + ~specific_val() { + if (valp && cfg->destructor) { + cfg->destructor(valp.get()); + } + } + + specific_val(const specific_val& other) = delete; + specific_val& operator=(const specific_val& other) = delete; + + specific_val(specific_val&& other) : valp(std::move(other.valp)), cfg(std::move(other.cfg)) {} + + specific_val& operator=(specific_val&& other) { + if (this != &other) { + valp = std::move(other.valp); + cfg = std::move(other.cfg); + } + return *this; + } + + void* get() { return valp.get(); } + + void rename() { + if (valp && cfg->rename) { + cfg->rename(valp.get()); + } + } + }; + struct per_scheduling_group { bool queue_is_initialized = false; /** @@ -44,10 +87,16 @@ struct scheduling_group_specific_thread_local_data { * data. The pointer is not use as is but is cast to a reference * to the appropriate type that is actually pointed to. */ - std::vector specific_vals; + std::vector specific_vals; + + void rename() { + for (auto& v : specific_vals) { + v.rename(); + } + } }; std::array per_scheduling_group_data; - std::map scheduling_group_key_configs; + std::map scheduling_group_key_configs; }; #ifdef SEASTAR_BUILD_SHARED_LIBS @@ -78,12 +127,12 @@ template T* scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept { auto& data = internal::get_scheduling_group_specific_thread_local_data(); #ifdef SEASTAR_DEBUG - assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index); + assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index); #endif auto sg_id = internal::scheduling_group_index(sg); if (__builtin_expect(sg_id < data.per_scheduling_group_data.size() && data.per_scheduling_group_data[sg_id].queue_is_initialized, true)) { - return reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]); + return reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get()); } return nullptr; } @@ -123,9 +172,9 @@ T& scheduling_group_get_specific(scheduling_group_key key) noexcept { // return a reference to an element whose queue_is_initialized is // false. auto& data = internal::get_scheduling_group_specific_thread_local_data(); - assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index); + assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index); auto sg_id = internal::scheduling_group_index(current_scheduling_group()); - return *reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]); + return *reinterpret_cast(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get()); } /** @@ -155,7 +204,7 @@ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, auto wrapped_mapper = [key, mapper] (per_scheduling_group& psg) { auto id = internal::scheduling_group_key_id(key); return make_ready_future::return_type> - (mapper(*reinterpret_cast(psg.specific_vals[id]))); + (mapper(*reinterpret_cast(psg.specific_vals[id].get()))); }; return map_reduce( @@ -188,7 +237,7 @@ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, schedulin auto mapper = [key] (per_scheduling_group& psg) { auto id = internal::scheduling_group_key_id(key); - return make_ready_future(*reinterpret_cast(psg.specific_vals[id])); + return make_ready_future(*reinterpret_cast(psg.specific_vals[id].get())); }; return map_reduce( diff --git a/src/core/reactor.cc b/src/core/reactor.cc index d25d25a3d5..2fff95b41d 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1066,23 +1066,12 @@ reactor::~reactor() { eraser(_expired_timers); eraser(_expired_lowres_timers); eraser(_expired_manual_timers); - auto& sg_data = _scheduling_group_specific_data; for (auto&& tq : _task_queues) { if (tq) { - auto& this_sg = sg_data.per_scheduling_group_data[tq->_id]; // The following line will preserve the convention that constructor and destructor functions // for the per sg values are called in the context of the containing scheduling group. *internal::current_scheduling_group_ptr() = scheduling_group(tq->_id); - for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { - void* val = this_sg.specific_vals[key_id]; - if (val) { - if (cfg.destructor) { - cfg.destructor(val); - } - free(val); - this_sg.specific_vals[key_id] = nullptr; - } - } + get_sg_data(tq->_id).specific_vals.clear(); } } } @@ -4896,29 +4885,23 @@ deallocate_scheduling_group_id(unsigned id) noexcept { } static -void* -allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const scheduling_group_key_config& cfg) { - void* val = aligned_alloc(cfg.alignment, cfg.allocation_size); - if (!val) { - std::abort(); - } - if (cfg.constructor) { - cfg.constructor(val); +internal::scheduling_group_specific_thread_local_data::specific_val +allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const lw_shared_ptr& cfg) { + using val_ptr = internal::scheduling_group_specific_thread_local_data::val_ptr; + using specific_val = internal::scheduling_group_specific_thread_local_data::specific_val; + + val_ptr valp(aligned_alloc(cfg->alignment, cfg->allocation_size), &free); + if (!valp) { + throw std::runtime_error("memory allocation failed"); } - return val; + return specific_val(std::move(valp), cfg); } future<> reactor::rename_scheduling_group_specific_data(scheduling_group sg) { return with_shared(_scheduling_group_keys_mutex, [this, sg] { return with_scheduling_group(sg, [this, sg] { - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; - for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { - if (cfg.rename) { - (cfg.rename)(this_sg.specific_vals[key_id]); - } - } + get_sg_data(sg).rename(); }); }); } @@ -4926,19 +4909,25 @@ reactor::rename_scheduling_group_specific_data(scheduling_group sg) { future<> reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares) { return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares] { - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; - this_sg.queue_is_initialized = true; _task_queues.resize(std::max(_task_queues.size(), sg._id + 1)); _task_queues[sg._id] = std::make_unique(sg._id, name, shortname, shares); - return with_scheduling_group(sg, [this, sg, &sg_data] () { + using val_vector = decltype(_scheduling_group_specific_data.per_scheduling_group_data[sg._id].specific_vals); + + return with_scheduling_group(sg, [this, sg] () { + auto& sg_data = _scheduling_group_specific_data; + + val_vector vals; + vals.reserve(sg_data.scheduling_group_key_configs.size()); for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; - this_sg.specific_vals.resize(std::max(this_sg.specific_vals.size(), key_id+1)); - this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg); + vals.resize(std::max(vals.size(), key_id+1)); + vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg); } + return vals; + }).then([this, sg] (val_vector vals) { + auto& this_sg = get_sg_data(sg); + std::swap(this_sg.specific_vals, vals); + this_sg.queue_is_initialized = true; }); }); } @@ -4946,10 +4935,10 @@ reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstri future<> reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) { return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] { - auto& sg_data = _scheduling_group_specific_data; auto key_id = internal::scheduling_group_key_id(key); - sg_data.scheduling_group_key_configs[key_id] = cfg; - return parallel_for_each(_task_queues, [this, cfg, key_id] (std::unique_ptr& tq) { + auto cfgp = make_lw_shared(std::move(cfg)); + + return parallel_for_each(_task_queues, [this, key_id, cfgp] (std::unique_ptr& tq) { if (tq) { scheduling_group sg = scheduling_group(tq->_id); if (tq.get() == _at_destroy_tasks) { @@ -4957,20 +4946,20 @@ reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_grou auto curr = current_scheduling_group(); auto cleanup = defer([curr] () noexcept { *internal::current_scheduling_group_ptr() = curr; }); *internal::current_scheduling_group_ptr() = sg; - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + auto& this_sg = get_sg_data(sg); this_sg.specific_vals.resize(std::max(this_sg.specific_vals.size(), key_id+1)); - this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, sg_data.scheduling_group_key_configs[key_id]); + this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, std::move(cfgp)); } else { - return with_scheduling_group(sg, [this, key_id, sg] () { - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; + return with_scheduling_group(sg, [this, key_id, sg, cfgp = std::move(cfgp)] () { + auto& this_sg = get_sg_data(sg); this_sg.specific_vals.resize(std::max(this_sg.specific_vals.size(), key_id+1)); - this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, sg_data.scheduling_group_key_configs[key_id]); + this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, std::move(cfgp)); }); } } return make_ready_future(); + }).then([this, key_id, cfgp] () { + _scheduling_group_specific_data.scheduling_group_key_configs[key_id] = std::move(cfgp); }); }); } @@ -4981,22 +4970,9 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept { on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id)); } return with_scheduling_group(sg, [this, sg] () { - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; - for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { - void* val = this_sg.specific_vals[key_id]; - if (val) { - if (cfg.destructor) { - cfg.destructor(val); - } - free(val); - this_sg.specific_vals[key_id] = nullptr; - } - } + get_sg_data(sg).specific_vals.clear(); }).then( [this, sg] () { - auto& sg_data = _scheduling_group_specific_data; - auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; - this_sg.queue_is_initialized = false; + get_sg_data(sg).queue_is_initialized = false; _task_queues[sg._id].reset(); }); diff --git a/tests/unit/scheduling_group_test.cc b/tests/unit/scheduling_group_test.cc index 9b5f721726..16c31bffe0 100644 --- a/tests/unit/scheduling_group_test.cc +++ b/tests/unit/scheduling_group_test.cc @@ -375,6 +375,75 @@ SEASTAR_THREAD_TEST_CASE(sg_create_and_key_create_in_parallel) { } } +SEASTAR_THREAD_TEST_CASE(sg_key_constructor_exception_when_creating_new_key) { + using ivec = std::vector; + const int num_scheduling_groups = 4; + + scheduling_group_key_config key1_conf = make_scheduling_group_key_config(); + scheduling_group_key key1 = scheduling_group_key_create(key1_conf).get(); + + struct thrower { + thrower() { + throw std::runtime_error("constructor failed"); + } + ~thrower() { + // Shouldn't get here because the constructor shouldn't succeed + BOOST_ASSERT(false); + } + }; + scheduling_group_key_config thrower_conf = make_scheduling_group_key_config(); + BOOST_REQUIRE_THROW(scheduling_group_key_create(thrower_conf).get(), std::runtime_error); + + // check we can continue after the failure + + std::vector sgs; + for (int i = 0; i < num_scheduling_groups; i++) { + sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get()); + } + + const auto destroy_scheduling_groups = defer([&sgs] () noexcept { + for (scheduling_group sg : sgs) { + destroy_scheduling_group(sg).get(); + } + }); + + scheduling_group_key_config key2_conf = make_scheduling_group_key_config(); + scheduling_group_key key2 = scheduling_group_key_create(key2_conf).get(); + + smp::invoke_on_all([key1, key2, &sgs] () { + int factor = this_shard_id() + 1; + for (int i=0; i < num_scheduling_groups; i++) { + sgs[i].get_specific(key1) = (i + 1) * factor; + sgs[i].get_specific(key2).push_back((i + 1) * factor); + } + + for (int i=0; i < num_scheduling_groups; i++) { + BOOST_REQUIRE_EQUAL(sgs[i].get_specific(key1) = (i + 1) * factor, (i + 1) * factor); + BOOST_REQUIRE_EQUAL(sgs[i].get_specific(key2)[0], (i + 1) * factor); + } + + }).get(); + + smp::invoke_on_all([key1, key2] () { + return reduce_scheduling_group_specific(std::plus(), int(0), key1).then([] (int sum) { + int factor = this_shard_id() + 1; + int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2; + BOOST_REQUIRE_EQUAL(expected_sum, sum); + }). then([key2] { + auto ivec_to_int = [] (ivec& v) { + return v.size() ? v[0] : 0; + }; + + return map_reduce_scheduling_group_specific(ivec_to_int, std::plus(), int(0), key2).then([] (int sum) { + int factor = this_shard_id() + 1; + int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2; + BOOST_REQUIRE_EQUAL(expected_sum, sum); + }); + + }); + }).get(); +} + SEASTAR_THREAD_TEST_CASE(sg_create_with_destroy_tasks) { struct nada{};