Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smp: add a function that barriers memory prefault work #2608

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ private:
std::atomic<uint64_t> _pending_signals;
std::unordered_map<int, signal_handler> _signal_handlers;
};

signals _signals;
std::unique_ptr<thread_pool> _thread_pool;

friend class thread_pool;
friend class thread_context;
friend class internal::cpu_stall_detector;

friend void handle_signal(int signo, noncopyable_function<void ()>&& handler, bool once);
friend void barrier_memory_prefault();

uint64_t pending_task_count() const;
void run_tasks(task_queue& tq);
Expand Down
8 changes: 8 additions & 0 deletions include/seastar/core/smp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ public:
void cleanup_cpu();
void arrive_at_event_loop_end();
void join_all();
void join_memory_prefault();
static bool main_thread() { return std::this_thread::get_id() == _tmain; }

/// Runs a function on a remote core.
Expand Down Expand Up @@ -484,6 +485,13 @@ public:
static unsigned count;
};

/// Barriers memory prefault background work.
/// This function returns after all memory prefault work is finished.
/// It is safe to call from multiple threads.
///
/// Note: This is a blocking operation, so it should be used with caution.
void barrier_memory_prefault();

SEASTAR_MODULE_EXPORT_END

}
5 changes: 5 additions & 0 deletions src/core/prefault.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ namespace seastar::internal {
class memory_prefaulter {
std::atomic<bool> _stop_request = false;
std::vector<posix_thread> _worker_threads;

std::mutex join_mtx;
bool joined = false;

// Keep this in object scope to avoid allocating in worker thread
std::unordered_map<unsigned, std::vector<memory::internal::memory_range>> _layout_by_node_id;
public:
explicit memory_prefaulter(const resource::resources& res, memory::internal::numa_layout layout);
~memory_prefaulter();
void join_all();
private:
void work(std::vector<memory::internal::memory_range>& ranges, size_t page_size, std::optional<size_t> huge_page_size_opt);
};
Expand Down
4 changes: 4 additions & 0 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4637,6 +4637,10 @@ void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_
}
}

void barrier_memory_prefault() {
engine()._smp->join_memory_prefault();
}

bool smp::poll_queues() {
size_t got = 0;
for (unsigned i = 0; i < count; i++) {
Expand Down
22 changes: 19 additions & 3 deletions src/core/smp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ smp::setup_prefaulter(const seastar::resource::resources& res, seastar::memory::
#endif
}

void
smp::join_memory_prefault() {
if (_prefaulter) {
_prefaulter->join_all();
}
}

static
std::optional<size_t>
get_huge_page_size() {
Expand Down Expand Up @@ -224,11 +231,20 @@ internal::memory_prefaulter::memory_prefaulter(const resource::resources& res, m
}
}

void
internal::memory_prefaulter::join_all() {
const std::lock_guard<std::mutex> lock(join_mtx);
if (!joined) {
for (auto& t : _worker_threads) {
t.join();
}
joined = true;
}
}

internal::memory_prefaulter::~memory_prefaulter() {
_stop_request.store(true, std::memory_order_relaxed);
for (auto& t : _worker_threads) {
t.join();
}
join_all();
}

void
Expand Down
Loading