Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduling_group: improve scheduling group creation exception safety #2617

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ private:
std::atomic<bool> _dying{false};
gate _background_gate;

inline auto& get_sg_data(const scheduling_group& sg) {
return _scheduling_group_specific_data.per_scheduling_group_data[sg._id];
}

inline auto& get_sg_data(unsigned sg_id) {
return _scheduling_group_specific_data.per_scheduling_group_data[sg_id];
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need in those helpers, AFAICS, the existing get_scheduling_group_specific_thread_local_data() (and Co) already provide access to the array of per_scheduling_group_data-s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it because in several places we access the sg data and we do

    auto& sg_data = _scheduling_group_specific_data;
    auto& this_sg = sg_data.per_scheduling_group_data[sg._id];

which I thought is a little cumbersome and I didn't find another method for this

private:
static std::chrono::nanoseconds calculate_poll_time();
static void block_notifier(int);
Expand Down Expand Up @@ -393,7 +401,6 @@ private:
task_queue* pop_active_task_queue(sched_clock::time_point now);
void insert_activating_task_queues();
void account_runtime(task_queue& tq, sched_clock::duration runtime);
void allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id);
xemul marked this conversation as resolved.
Show resolved Hide resolved
future<> rename_scheduling_group_specific_data(scheduling_group sg);
future<> init_scheduling_group(scheduling_group sg, sstring name, sstring shortname, float shares);
future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
Expand Down
65 changes: 57 additions & 8 deletions include/seastar/core/scheduling_specific.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,66 @@ namespace seastar {
namespace internal {

struct scheduling_group_specific_thread_local_data {
using val_ptr = std::unique_ptr<void, void (*)(void*) noexcept>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point in making it smart-pointer if you track construction/destruction by hand anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is to manage the dynamic memory allocation and free it automatically

using cfg_ptr = lw_shared_ptr<scheduling_group_key_config>;

struct specific_val {
val_ptr valp;
cfg_ptr cfg;

specific_val() : valp(nullptr, &free), cfg(nullptr) {}
xemul marked this conversation as resolved.
Show resolved Hide resolved

specific_val(val_ptr&& valp_, const cfg_ptr& cfg_) : valp(std::move(valp_)), cfg(cfg_) {
xemul marked this conversation as resolved.
Show resolved Hide resolved
if (valp && cfg->constructor) {
cfg->constructor(valp.get());
}
}

~specific_val() {
if (valp && cfg->destructor) {
cfg->destructor(valp.get());
}
}

specific_val(const specific_val& other) = delete;
specific_val& operator=(const specific_val& other) = delete;

specific_val(specific_val&& other) : valp(std::move(other.valp)), cfg(std::move(other.cfg)) {}

specific_val& operator=(specific_val&& other) {
if (this != &other) {
valp = std::move(other.valp);
cfg = std::move(other.cfg);
}
return *this;
}

void* get() { return valp.get(); }

void rename() {
if (valp && cfg->rename) {
cfg->rename(valp.get());
}
}
};

struct per_scheduling_group {
bool queue_is_initialized = false;
/**
* This array holds pointers to the scheduling group specific
* data. The pointer is not use as is but is cast to a reference
* to the appropriate type that is actually pointed to.
*/
std::vector<void*> specific_vals;
std::vector<specific_val> specific_vals;

void rename() {
for (auto& v : specific_vals) {
v.rename();
}
}
};
std::array<per_scheduling_group, max_scheduling_groups()> per_scheduling_group_data;
std::map<unsigned long, scheduling_group_key_config> scheduling_group_key_configs;
std::map<unsigned long, cfg_ptr> scheduling_group_key_configs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this map is no longer needed? The very came config can be obtained via per_scheduling_group_data[context.sg_id].specific_vals[key_id].cfg? I'm not proposing to change anything right now, just checking if my understanding is correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still used in init_scheduling_group when we init a new scheduling group and go over all the keys to allocate them for the new sg

};

#ifdef SEASTAR_BUILD_SHARED_LIBS
Expand Down Expand Up @@ -78,12 +127,12 @@ template<typename T>
T* scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept {
auto& data = internal::get_scheduling_group_specific_thread_local_data();
#ifdef SEASTAR_DEBUG
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index);
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index);
#endif
auto sg_id = internal::scheduling_group_index(sg);
if (__builtin_expect(sg_id < data.per_scheduling_group_data.size() &&
data.per_scheduling_group_data[sg_id].queue_is_initialized, true)) {
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]);
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get());
}
return nullptr;
}
Expand Down Expand Up @@ -123,9 +172,9 @@ T& scheduling_group_get_specific(scheduling_group_key key) noexcept {
// return a reference to an element whose queue_is_initialized is
// false.
auto& data = internal::get_scheduling_group_specific_thread_local_data();
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index);
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index);
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]);
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get());
}

/**
Expand Down Expand Up @@ -155,7 +204,7 @@ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer,
auto wrapped_mapper = [key, mapper] (per_scheduling_group& psg) {
auto id = internal::scheduling_group_key_id(key);
return make_ready_future<typename function_traits<Mapper>::return_type>
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id])));
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get())));
};

return map_reduce(
Expand Down Expand Up @@ -188,7 +237,7 @@ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, schedulin

auto mapper = [key] (per_scheduling_group& psg) {
auto id = internal::scheduling_group_key_id(key);
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id]));
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get()));
};

return map_reduce(
Expand Down
87 changes: 30 additions & 57 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,23 +1066,12 @@ reactor::~reactor() {
eraser(_expired_timers);
eraser(_expired_lowres_timers);
eraser(_expired_manual_timers);
auto& sg_data = _scheduling_group_specific_data;
for (auto&& tq : _task_queues) {
if (tq) {
auto& this_sg = sg_data.per_scheduling_group_data[tq->_id];
// The following line will preserve the convention that constructor and destructor functions
// for the per sg values are called in the context of the containing scheduling group.
*internal::current_scheduling_group_ptr() = scheduling_group(tq->_id);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
void* val = this_sg.specific_vals[key_id];
if (val) {
if (cfg.destructor) {
cfg.destructor(val);
}
free(val);
this_sg.specific_vals[key_id] = nullptr;
}
}
get_sg_data(tq->_id).specific_vals.clear();
}
}
}
Expand Down Expand Up @@ -4895,48 +4884,41 @@ deallocate_scheduling_group_id(unsigned id) noexcept {
s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed);
}

void
reactor::allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id) {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
const auto& cfg = sg_data.scheduling_group_key_configs[key_id];
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = aligned_alloc(cfg.alignment, cfg.allocation_size);
if (!this_sg.specific_vals[key_id]) {
std::abort();
}
if (cfg.constructor) {
cfg.constructor(this_sg.specific_vals[key_id]);
static
internal::scheduling_group_specific_thread_local_data::specific_val
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const lw_shared_ptr<scheduling_group_key_config>& cfg) {
using val_ptr = internal::scheduling_group_specific_thread_local_data::val_ptr;
using specific_val = internal::scheduling_group_specific_thread_local_data::specific_val;

val_ptr valp(aligned_alloc(cfg->alignment, cfg->allocation_size), &free);
xemul marked this conversation as resolved.
Show resolved Hide resolved
if (!valp) {
throw std::runtime_error("memory allocation failed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was std::abort() before this patch. Why did you change that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the intention of the patch is to improve error handling and allow the node to continue to function in face of unexpected errors
so if we can handle exceptions in this path I don't think there's reason anymore to abort in this specific case

}
return specific_val(std::move(valp), cfg);
}

future<>
reactor::rename_scheduling_group_specific_data(scheduling_group sg) {
return with_shared(_scheduling_group_keys_mutex, [this, sg] {
return with_scheduling_group(sg, [this, sg] {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
if (cfg.rename) {
(cfg.rename)(this_sg.specific_vals[key_id]);
}
}
get_sg_data(sg).rename();
});
});
}

future<>
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares) {
return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares] {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.queue_is_initialized = true;
get_sg_data(sg).queue_is_initialized = true;
_task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1));
_task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shortname, shares);

return with_scheduling_group(sg, [this, sg, &sg_data] () {
return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = get_sg_data(sg);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
allocate_scheduling_group_specific_data(sg, key_id);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg);
}
});
});
Expand All @@ -4945,21 +4927,25 @@ reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstri
future<>
reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] {
auto& sg_data = _scheduling_group_specific_data;
auto key_id = internal::scheduling_group_key_id(key);
sg_data.scheduling_group_key_configs[key_id] = cfg;
return parallel_for_each(_task_queues, [this, cfg, key_id] (std::unique_ptr<task_queue>& tq) {
auto cfgp = make_lw_shared<scheduling_group_key_config>(std::move(cfg));
_scheduling_group_specific_data.scheduling_group_key_configs[key_id] = cfgp;
return parallel_for_each(_task_queues, [this, cfgp, key_id] (std::unique_ptr<task_queue>& tq) {
if (tq) {
scheduling_group sg = scheduling_group(tq->_id);
if (tq.get() == _at_destroy_tasks) {
// fake the group by assuming it here
auto curr = current_scheduling_group();
auto cleanup = defer([curr] () noexcept { *internal::current_scheduling_group_ptr() = curr; });
*internal::current_scheduling_group_ptr() = sg;
allocate_scheduling_group_specific_data(sg, key_id);
auto& this_sg = get_sg_data(sg);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfgp);
} else {
return with_scheduling_group(sg, [this, key_id, sg] () {
allocate_scheduling_group_specific_data(sg, key_id);
return with_scheduling_group(sg, [this, key_id, sg, cfgp = std::move(cfgp)] () {
auto& this_sg = get_sg_data(sg);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfgp);
});
}
}
Expand All @@ -4974,22 +4960,9 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept {
on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id));
}
return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
void* val = this_sg.specific_vals[key_id];
if (val) {
if (cfg.destructor) {
cfg.destructor(val);
}
free(val);
this_sg.specific_vals[key_id] = nullptr;
}
}
get_sg_data(sg).specific_vals.clear();
}).then( [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.queue_is_initialized = false;
get_sg_data(sg).queue_is_initialized = false;
_task_queues[sg._id].reset();
});

Expand Down
17 changes: 17 additions & 0 deletions tests/unit/scheduling_group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,23 @@ SEASTAR_THREAD_TEST_CASE(sg_create_and_key_create_in_parallel) {
}
}

SEASTAR_THREAD_TEST_CASE(sg_key_constructor_exception_when_creating_new_key) {
scheduling_group_key_config key_conf = make_scheduling_group_key_config<int>();
scheduling_group_key_create(key_conf).get();

struct thrower {
thrower() {
throw std::runtime_error("constructor failed");
}
~thrower() {
// Shouldn't get here because the constructor shouldn't succeed
BOOST_ASSERT(false);
}
};
scheduling_group_key_config thrower_conf = make_scheduling_group_key_config<thrower>();
BOOST_REQUIRE_THROW(scheduling_group_key_create(thrower_conf).get(), std::runtime_error);
}

SEASTAR_THREAD_TEST_CASE(sg_create_with_destroy_tasks) {
struct nada{};

Expand Down
Loading