-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduling_group: improve scheduling group creation exception safety #2617
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,17 +37,66 @@ namespace seastar { | |
namespace internal { | ||
|
||
struct scheduling_group_specific_thread_local_data { | ||
using val_ptr = std::unique_ptr<void, void (*)(void*) noexcept>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the point in making it smart-pointer if you track construction/destruction by hand anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The point is to manage the dynamic memory allocation and free it automatically |
||
using cfg_ptr = lw_shared_ptr<scheduling_group_key_config>; | ||
|
||
struct specific_val { | ||
val_ptr valp; | ||
cfg_ptr cfg; | ||
|
||
specific_val() : valp(nullptr, &free), cfg(nullptr) {} | ||
xemul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
specific_val(val_ptr&& valp_, const cfg_ptr& cfg_) : valp(std::move(valp_)), cfg(cfg_) { | ||
xemul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (valp && cfg->constructor) { | ||
cfg->constructor(valp.get()); | ||
} | ||
} | ||
|
||
~specific_val() { | ||
if (valp && cfg->destructor) { | ||
cfg->destructor(valp.get()); | ||
} | ||
} | ||
|
||
specific_val(const specific_val& other) = delete; | ||
specific_val& operator=(const specific_val& other) = delete; | ||
|
||
specific_val(specific_val&& other) : valp(std::move(other.valp)), cfg(std::move(other.cfg)) {} | ||
|
||
specific_val& operator=(specific_val&& other) { | ||
if (this != &other) { | ||
valp = std::move(other.valp); | ||
cfg = std::move(other.cfg); | ||
} | ||
return *this; | ||
} | ||
|
||
void* get() { return valp.get(); } | ||
|
||
void rename() { | ||
if (valp && cfg->rename) { | ||
cfg->rename(valp.get()); | ||
} | ||
} | ||
}; | ||
|
||
struct per_scheduling_group { | ||
bool queue_is_initialized = false; | ||
/** | ||
* This array holds pointers to the scheduling group specific | ||
* data. The pointer is not use as is but is cast to a reference | ||
* to the appropriate type that is actually pointed to. | ||
*/ | ||
std::vector<void*> specific_vals; | ||
std::vector<specific_val> specific_vals; | ||
|
||
void rename() { | ||
for (auto& v : specific_vals) { | ||
v.rename(); | ||
} | ||
} | ||
}; | ||
std::array<per_scheduling_group, max_scheduling_groups()> per_scheduling_group_data; | ||
std::map<unsigned long, scheduling_group_key_config> scheduling_group_key_configs; | ||
std::map<unsigned long, cfg_ptr> scheduling_group_key_configs; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this map is no longer needed? The very came config can be obtained via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's still used in |
||
}; | ||
|
||
#ifdef SEASTAR_BUILD_SHARED_LIBS | ||
|
@@ -78,12 +127,12 @@ template<typename T> | |
T* scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept { | ||
auto& data = internal::get_scheduling_group_specific_thread_local_data(); | ||
#ifdef SEASTAR_DEBUG | ||
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index); | ||
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index); | ||
#endif | ||
auto sg_id = internal::scheduling_group_index(sg); | ||
if (__builtin_expect(sg_id < data.per_scheduling_group_data.size() && | ||
data.per_scheduling_group_data[sg_id].queue_is_initialized, true)) { | ||
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]); | ||
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get()); | ||
} | ||
return nullptr; | ||
} | ||
|
@@ -123,9 +172,9 @@ T& scheduling_group_get_specific(scheduling_group_key key) noexcept { | |
// return a reference to an element whose queue_is_initialized is | ||
// false. | ||
auto& data = internal::get_scheduling_group_specific_thread_local_data(); | ||
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index); | ||
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index); | ||
auto sg_id = internal::scheduling_group_index(current_scheduling_group()); | ||
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]); | ||
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get()); | ||
} | ||
|
||
/** | ||
|
@@ -155,7 +204,7 @@ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, | |
auto wrapped_mapper = [key, mapper] (per_scheduling_group& psg) { | ||
auto id = internal::scheduling_group_key_id(key); | ||
return make_ready_future<typename function_traits<Mapper>::return_type> | ||
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id]))); | ||
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get()))); | ||
}; | ||
|
||
return map_reduce( | ||
|
@@ -188,7 +237,7 @@ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, schedulin | |
|
||
auto mapper = [key] (per_scheduling_group& psg) { | ||
auto id = internal::scheduling_group_key_id(key); | ||
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id])); | ||
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get())); | ||
}; | ||
|
||
return map_reduce( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1066,23 +1066,12 @@ reactor::~reactor() { | |
eraser(_expired_timers); | ||
eraser(_expired_lowres_timers); | ||
eraser(_expired_manual_timers); | ||
auto& sg_data = _scheduling_group_specific_data; | ||
for (auto&& tq : _task_queues) { | ||
if (tq) { | ||
auto& this_sg = sg_data.per_scheduling_group_data[tq->_id]; | ||
// The following line will preserve the convention that constructor and destructor functions | ||
// for the per sg values are called in the context of the containing scheduling group. | ||
*internal::current_scheduling_group_ptr() = scheduling_group(tq->_id); | ||
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { | ||
void* val = this_sg.specific_vals[key_id]; | ||
if (val) { | ||
if (cfg.destructor) { | ||
cfg.destructor(val); | ||
} | ||
free(val); | ||
this_sg.specific_vals[key_id] = nullptr; | ||
} | ||
} | ||
get_sg_data(tq->_id).specific_vals.clear(); | ||
} | ||
} | ||
} | ||
|
@@ -4895,48 +4884,41 @@ deallocate_scheduling_group_id(unsigned id) noexcept { | |
s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed); | ||
} | ||
|
||
void | ||
reactor::allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id) { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | ||
const auto& cfg = sg_data.scheduling_group_key_configs[key_id]; | ||
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1)); | ||
this_sg.specific_vals[key_id] = aligned_alloc(cfg.alignment, cfg.allocation_size); | ||
if (!this_sg.specific_vals[key_id]) { | ||
std::abort(); | ||
} | ||
if (cfg.constructor) { | ||
cfg.constructor(this_sg.specific_vals[key_id]); | ||
static | ||
internal::scheduling_group_specific_thread_local_data::specific_val | ||
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const lw_shared_ptr<scheduling_group_key_config>& cfg) { | ||
using val_ptr = internal::scheduling_group_specific_thread_local_data::val_ptr; | ||
using specific_val = internal::scheduling_group_specific_thread_local_data::specific_val; | ||
|
||
val_ptr valp(aligned_alloc(cfg->alignment, cfg->allocation_size), &free); | ||
xemul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!valp) { | ||
throw std::runtime_error("memory allocation failed"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because the intention of the patch is to improve error handling and allow the node to continue to function in face of unexpected errors |
||
} | ||
return specific_val(std::move(valp), cfg); | ||
} | ||
|
||
future<> | ||
reactor::rename_scheduling_group_specific_data(scheduling_group sg) { | ||
return with_shared(_scheduling_group_keys_mutex, [this, sg] { | ||
return with_scheduling_group(sg, [this, sg] { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | ||
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { | ||
if (cfg.rename) { | ||
(cfg.rename)(this_sg.specific_vals[key_id]); | ||
} | ||
} | ||
get_sg_data(sg).rename(); | ||
}); | ||
}); | ||
} | ||
|
||
future<> | ||
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares) { | ||
return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares] { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | ||
this_sg.queue_is_initialized = true; | ||
get_sg_data(sg).queue_is_initialized = true; | ||
_task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1)); | ||
_task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shortname, shares); | ||
|
||
return with_scheduling_group(sg, [this, sg, &sg_data] () { | ||
return with_scheduling_group(sg, [this, sg] () { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto& this_sg = get_sg_data(sg); | ||
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { | ||
allocate_scheduling_group_specific_data(sg, key_id); | ||
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1)); | ||
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg); | ||
} | ||
}); | ||
}); | ||
|
@@ -4945,21 +4927,25 @@ reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstri | |
future<> | ||
reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) { | ||
return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto key_id = internal::scheduling_group_key_id(key); | ||
sg_data.scheduling_group_key_configs[key_id] = cfg; | ||
return parallel_for_each(_task_queues, [this, cfg, key_id] (std::unique_ptr<task_queue>& tq) { | ||
auto cfgp = make_lw_shared<scheduling_group_key_config>(std::move(cfg)); | ||
_scheduling_group_specific_data.scheduling_group_key_configs[key_id] = cfgp; | ||
return parallel_for_each(_task_queues, [this, cfgp, key_id] (std::unique_ptr<task_queue>& tq) { | ||
if (tq) { | ||
scheduling_group sg = scheduling_group(tq->_id); | ||
if (tq.get() == _at_destroy_tasks) { | ||
// fake the group by assuming it here | ||
auto curr = current_scheduling_group(); | ||
auto cleanup = defer([curr] () noexcept { *internal::current_scheduling_group_ptr() = curr; }); | ||
*internal::current_scheduling_group_ptr() = sg; | ||
allocate_scheduling_group_specific_data(sg, key_id); | ||
auto& this_sg = get_sg_data(sg); | ||
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1)); | ||
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfgp); | ||
} else { | ||
return with_scheduling_group(sg, [this, key_id, sg] () { | ||
allocate_scheduling_group_specific_data(sg, key_id); | ||
return with_scheduling_group(sg, [this, key_id, sg, cfgp = std::move(cfgp)] () { | ||
auto& this_sg = get_sg_data(sg); | ||
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1)); | ||
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfgp); | ||
}); | ||
} | ||
} | ||
|
@@ -4974,22 +4960,9 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept { | |
on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id)); | ||
} | ||
return with_scheduling_group(sg, [this, sg] () { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | ||
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) { | ||
void* val = this_sg.specific_vals[key_id]; | ||
if (val) { | ||
if (cfg.destructor) { | ||
cfg.destructor(val); | ||
} | ||
free(val); | ||
this_sg.specific_vals[key_id] = nullptr; | ||
} | ||
} | ||
get_sg_data(sg).specific_vals.clear(); | ||
}).then( [this, sg] () { | ||
auto& sg_data = _scheduling_group_specific_data; | ||
auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | ||
this_sg.queue_is_initialized = false; | ||
get_sg_data(sg).queue_is_initialized = false; | ||
_task_queues[sg._id].reset(); | ||
}); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need in those helpers, AFAICS, the existing
get_scheduling_group_specific_thread_local_data()
(and Co) already provide access to the array of per_scheduling_group_data-sThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it because in several places we access the sg data and we do
which I thought is a little cumbersome and I didn't find another method for this