Skip to content

Commit

Permalink
scheduling_group: improve scheduling group creation exception safety
Browse files Browse the repository at this point in the history
Improve handling of exceptions during scheduling group and scheduling
group key creation, where a user-provided constructor for the keys may
fail, for example.

We use a new struct `specific_val` and smart pointers to manage memory
allocation, construction and destruction of scheduling group data in a
safe manner.

Fixes #2222
  • Loading branch information
mlitvk committed Jan 26, 2025
1 parent 19c266c commit 5666802
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 67 deletions.
8 changes: 8 additions & 0 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ private:
std::atomic<bool> _dying{false};
gate _background_gate;

inline auto& get_sg_data(const scheduling_group& sg) {
return _scheduling_group_specific_data.per_scheduling_group_data[sg._id];
}

inline auto& get_sg_data(unsigned sg_id) {
return _scheduling_group_specific_data.per_scheduling_group_data[sg_id];
}

private:
static std::chrono::nanoseconds calculate_poll_time();
static void block_notifier(int);
Expand Down
65 changes: 57 additions & 8 deletions include/seastar/core/scheduling_specific.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,66 @@ namespace seastar {
namespace internal {

struct scheduling_group_specific_thread_local_data {
using val_ptr = std::unique_ptr<void, void (*)(void*) noexcept>;
using cfg_ptr = lw_shared_ptr<scheduling_group_key_config>;

struct specific_val {
val_ptr valp;
cfg_ptr cfg;

specific_val() : valp(nullptr, &free), cfg(nullptr) {}

specific_val(val_ptr&& valp_, const cfg_ptr& cfg_) : valp(std::move(valp_)), cfg(cfg_) {
if (valp && cfg->constructor) {
cfg->constructor(valp.get());
}
}

~specific_val() {
if (valp && cfg->destructor) {
cfg->destructor(valp.get());
}
}

specific_val(const specific_val& other) = delete;
specific_val& operator=(const specific_val& other) = delete;

specific_val(specific_val&& other) : valp(std::move(other.valp)), cfg(std::move(other.cfg)) {}

specific_val& operator=(specific_val&& other) {
if (this != &other) {
valp = std::move(other.valp);
cfg = std::move(other.cfg);
}
return *this;
}

void* get() { return valp.get(); }

void rename() {
if (valp && cfg->rename) {
cfg->rename(valp.get());
}
}
};

struct per_scheduling_group {
bool queue_is_initialized = false;
/**
* This array holds pointers to the scheduling group specific
* data. The pointer is not use as is but is cast to a reference
* to the appropriate type that is actually pointed to.
*/
std::vector<void*> specific_vals;
std::vector<specific_val> specific_vals;

void rename() {
for (auto& v : specific_vals) {
v.rename();
}
}
};
std::array<per_scheduling_group, max_scheduling_groups()> per_scheduling_group_data;
std::map<unsigned long, scheduling_group_key_config> scheduling_group_key_configs;
std::map<unsigned long, cfg_ptr> scheduling_group_key_configs;
};

#ifdef SEASTAR_BUILD_SHARED_LIBS
Expand Down Expand Up @@ -78,12 +127,12 @@ template<typename T>
T* scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept {
auto& data = internal::get_scheduling_group_specific_thread_local_data();
#ifdef SEASTAR_DEBUG
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index);
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index);
#endif
auto sg_id = internal::scheduling_group_index(sg);
if (__builtin_expect(sg_id < data.per_scheduling_group_data.size() &&
data.per_scheduling_group_data[sg_id].queue_is_initialized, true)) {
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]);
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get());
}
return nullptr;
}
Expand Down Expand Up @@ -123,9 +172,9 @@ T& scheduling_group_get_specific(scheduling_group_key key) noexcept {
// return a reference to an element whose queue_is_initialized is
// false.
auto& data = internal::get_scheduling_group_specific_thread_local_data();
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index);
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index);
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]);
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get());
}

/**
Expand Down Expand Up @@ -155,7 +204,7 @@ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer,
auto wrapped_mapper = [key, mapper] (per_scheduling_group& psg) {
auto id = internal::scheduling_group_key_id(key);
return make_ready_future<typename function_traits<Mapper>::return_type>
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id])));
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get())));
};

return map_reduce(
Expand Down Expand Up @@ -188,7 +237,7 @@ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, schedulin

auto mapper = [key] (per_scheduling_group& psg) {
auto id = internal::scheduling_group_key_id(key);
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id]));
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get()));
};

return map_reduce(
Expand Down
84 changes: 25 additions & 59 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,23 +1066,12 @@ reactor::~reactor() {
eraser(_expired_timers);
eraser(_expired_lowres_timers);
eraser(_expired_manual_timers);
auto& sg_data = _scheduling_group_specific_data;
for (auto&& tq : _task_queues) {
if (tq) {
auto& this_sg = sg_data.per_scheduling_group_data[tq->_id];
// The following line will preserve the convention that constructor and destructor functions
// for the per sg values are called in the context of the containing scheduling group.
*internal::current_scheduling_group_ptr() = scheduling_group(tq->_id);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
void* val = this_sg.specific_vals[key_id];
if (val) {
if (cfg.destructor) {
cfg.destructor(val);
}
free(val);
this_sg.specific_vals[key_id] = nullptr;
}
}
get_sg_data(tq->_id).specific_vals.clear();
}
}
}
Expand Down Expand Up @@ -4896,46 +4885,38 @@ deallocate_scheduling_group_id(unsigned id) noexcept {
}

static
void*
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const scheduling_group_key_config& cfg) {
void* val = aligned_alloc(cfg.alignment, cfg.allocation_size);
if (!val) {
std::abort();
}
if (cfg.constructor) {
cfg.constructor(val);
internal::scheduling_group_specific_thread_local_data::specific_val
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const lw_shared_ptr<scheduling_group_key_config>& cfg) {
using val_ptr = internal::scheduling_group_specific_thread_local_data::val_ptr;
using specific_val = internal::scheduling_group_specific_thread_local_data::specific_val;

val_ptr valp(aligned_alloc(cfg->alignment, cfg->allocation_size), &free);
if (!valp) {
throw std::runtime_error("memory allocation failed");
}
return val;
return specific_val(std::move(valp), cfg);
}

future<>
reactor::rename_scheduling_group_specific_data(scheduling_group sg) {
return with_shared(_scheduling_group_keys_mutex, [this, sg] {
return with_scheduling_group(sg, [this, sg] {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
if (cfg.rename) {
(cfg.rename)(this_sg.specific_vals[key_id]);
}
}
get_sg_data(sg).rename();
});
});
}

future<>
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares) {
return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares] {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.queue_is_initialized = true;
get_sg_data(sg).queue_is_initialized = true;
_task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1));
_task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shortname, shares);

return with_scheduling_group(sg, [this, sg, &sg_data] () {
return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = get_sg_data(sg);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg);
}
Expand All @@ -4946,27 +4927,25 @@ reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstri
future<>
reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] {
auto& sg_data = _scheduling_group_specific_data;
auto key_id = internal::scheduling_group_key_id(key);
sg_data.scheduling_group_key_configs[key_id] = cfg;
return parallel_for_each(_task_queues, [this, cfg, key_id] (std::unique_ptr<task_queue>& tq) {
auto cfgp = make_lw_shared<scheduling_group_key_config>(std::move(cfg));
_scheduling_group_specific_data.scheduling_group_key_configs[key_id] = cfgp;
return parallel_for_each(_task_queues, [this, cfgp, key_id] (std::unique_ptr<task_queue>& tq) {
if (tq) {
scheduling_group sg = scheduling_group(tq->_id);
if (tq.get() == _at_destroy_tasks) {
// fake the group by assuming it here
auto curr = current_scheduling_group();
auto cleanup = defer([curr] () noexcept { *internal::current_scheduling_group_ptr() = curr; });
*internal::current_scheduling_group_ptr() = sg;
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
auto& this_sg = get_sg_data(sg);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, sg_data.scheduling_group_key_configs[key_id]);
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfgp);
} else {
return with_scheduling_group(sg, [this, key_id, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
return with_scheduling_group(sg, [this, key_id, sg, cfgp = std::move(cfgp)] () {
auto& this_sg = get_sg_data(sg);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, sg_data.scheduling_group_key_configs[key_id]);
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfgp);
});
}
}
Expand All @@ -4981,22 +4960,9 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept {
on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id));
}
return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
void* val = this_sg.specific_vals[key_id];
if (val) {
if (cfg.destructor) {
cfg.destructor(val);
}
free(val);
this_sg.specific_vals[key_id] = nullptr;
}
}
get_sg_data(sg).specific_vals.clear();
}).then( [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.queue_is_initialized = false;
get_sg_data(sg).queue_is_initialized = false;
_task_queues[sg._id].reset();
});

Expand Down
17 changes: 17 additions & 0 deletions tests/unit/scheduling_group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,23 @@ SEASTAR_THREAD_TEST_CASE(sg_create_and_key_create_in_parallel) {
}
}

SEASTAR_THREAD_TEST_CASE(sg_key_constructor_exception_when_creating_new_key) {
scheduling_group_key_config key_conf = make_scheduling_group_key_config<int>();
scheduling_group_key_create(key_conf).get();

struct thrower {
thrower() {
throw std::runtime_error("constructor failed");
}
~thrower() {
// Shouldn't get here because the constructor shouldn't succeed
BOOST_ASSERT(false);
}
};
scheduling_group_key_config thrower_conf = make_scheduling_group_key_config<thrower>();
BOOST_REQUIRE_THROW(scheduling_group_key_create(thrower_conf).get(), std::runtime_error);
}

SEASTAR_THREAD_TEST_CASE(sg_create_with_destroy_tasks) {
struct nada{};

Expand Down

0 comments on commit 5666802

Please sign in to comment.