Skip to content

Commit

Permalink
support more config (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang authored Dec 24, 2024
2 parents 6ae9409 + ac75526 commit b228729
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 46 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ jobs:
aarch64-apple-darwin,

x86_64-pc-windows-gnu,
i686-pc-windows-gnu,
# i686-pc-windows-gnu,
x86_64-pc-windows-msvc,
i686-pc-windows-msvc,
# i686-pc-windows-msvc,
]
channel: [ 1.81.0, nightly-2024-08-02 ]
include:
Expand Down
3 changes: 0 additions & 3 deletions core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean";
/// Task global queue bean name.
pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean";

/// Stack pool bean name.
pub const STACK_POOL_BEAN: &str = "stackPoolBean";

/// Monitor bean name.
pub const MONITOR_BEAN: &str = "monitorBean";

Expand Down
40 changes: 38 additions & 2 deletions core/src/net/config.rs → core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@ pub struct Config {
min_size: usize,
max_size: usize,
keep_alive_time: u64,
min_memory_count: usize,
memory_keep_alive_time: u64,
hook: bool,
}

impl Config {
#[must_use]
pub fn single() -> Self {
Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, true)
Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, 0, 10_000_000_000, true)
}

#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
event_loop_size: usize,
stack_size: usize,
min_size: usize,
max_size: usize,
keep_alive_time: u64,
min_memory_count: usize,
memory_keep_alive_time: u64,
hook: bool,
) -> Self {
Self {
Expand All @@ -32,6 +37,8 @@ impl Config {
min_size,
max_size,
keep_alive_time,
min_memory_count,
memory_keep_alive_time,
hook,
}
}
Expand Down Expand Up @@ -61,6 +68,16 @@ impl Config {
self.keep_alive_time
}

#[must_use]
pub fn min_memory_count(&self) -> usize {
self.min_memory_count
}

#[must_use]
pub fn memory_keep_alive_time(&self) -> u64 {
self.memory_keep_alive_time
}

#[must_use]
pub fn hook(&self) -> bool {
self.hook
Expand Down Expand Up @@ -101,6 +118,16 @@ impl Config {
self
}

pub fn set_min_memory_count(&mut self, min_memory_count: usize) -> &mut Self {
self.min_memory_count = min_memory_count;
self
}

pub fn set_memory_keep_alive_time(&mut self, memory_keep_alive_time: u64) -> &mut Self {
self.memory_keep_alive_time = memory_keep_alive_time;
self
}

pub fn set_hook(&mut self, hook: bool) -> &mut Self {
self.hook = hook;
self
Expand All @@ -109,6 +136,15 @@ impl Config {

impl Default for Config {
fn default() -> Self {
Self::new(cpu_count(), DEFAULT_STACK_SIZE, 0, 65536, 0, true)
Self::new(
cpu_count(),
DEFAULT_STACK_SIZE,
0,
65536,
0,
0,
10_000_000_000,
true,
)
}
}
8 changes: 3 additions & 5 deletions core/src/coroutine/korosensei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::catch;
use crate::common::constants::CoroutineState;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::coroutine::stack_pool::{PooledStack, StackPool};
use crate::coroutine::stack_pool::{MemoryPool, PooledStack};
use crate::coroutine::suspender::Suspender;
use crate::coroutine::StackInfo;
use corosensei::stack::Stack;
Expand All @@ -29,7 +29,6 @@ pub struct Coroutine<'c, Param, Yield, Return> {
pub(crate) name: String,
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, PooledStack>,
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
pub(crate) stack_size: usize,
pub(crate) stack_infos: RefCell<VecDeque<StackInfo>>,
pub(crate) listeners: VecDeque<&'c dyn Listener<Yield, Return>>,
pub(crate) local: CoroutineLocal<'c>,
Expand Down Expand Up @@ -308,7 +307,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
stack_size: usize,
callback: F,
) -> std::io::Result<R> {
let stack_pool = StackPool::get_instance();
let stack_pool = MemoryPool::get_instance();
if let Some(co) = Self::current() {
let remaining_stack = unsafe { co.remaining_stack() };
if remaining_stack >= red_zone {
Expand Down Expand Up @@ -380,7 +379,7 @@ where
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
{
let stack_size = stack_size.max(crate::common::page_size());
let stack = StackPool::get_instance().allocate(stack_size)?;
let stack = MemoryPool::get_instance().allocate(stack_size)?;
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
Expand All @@ -403,7 +402,6 @@ where
let mut co = Coroutine {
name,
inner,
stack_size,
stack_infos,
state: Cell::new(CoroutineState::Ready),
listeners: VecDeque::default(),
Expand Down
12 changes: 7 additions & 5 deletions core/src/coroutine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::common::constants::CoroutineState;
use crate::common::ordered_work_steal::Ordered;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::{impl_current_for, impl_display_by_debug, impl_for_named};
Expand All @@ -16,10 +17,11 @@ pub mod local;
/// Coroutine listener abstraction and impl.
pub mod listener;

use crate::common::ordered_work_steal::Ordered;
/// Reuse stacks.
pub mod stack_pool;

#[cfg(feature = "korosensei")]
pub use korosensei::Coroutine;

#[cfg(feature = "korosensei")]
mod korosensei;

Expand Down Expand Up @@ -76,8 +78,6 @@ pub struct StackInfo {
/// Coroutine state abstraction and impl.
mod state;

pub(crate) mod stack_pool;

impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
/// Get the name of this coroutine.
pub fn name(&self) -> &str {
Expand Down Expand Up @@ -201,8 +201,10 @@ where
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Coroutine")
.field("name", &self.name())
.field("status", &self.state())
.field("state", &self.state())
.field("stack_infos", &self.stack_infos)
.field("local", &self.local)
.field("priority", &self.priority)
.finish()
}
}
Expand Down
52 changes: 33 additions & 19 deletions core/src/coroutine/stack_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::beans::BeanFactory;
use crate::common::constants::STACK_POOL_BEAN;
use crate::common::now;
use crate::config::Config;
use corosensei::stack::{DefaultStack, Stack, StackPointer};
use once_cell::sync::OnceCell;
use std::cell::UnsafeCell;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
Expand Down Expand Up @@ -138,35 +138,49 @@ impl PooledStack {
}
}

pub(crate) struct StackPool {
static STACK_POOL: OnceCell<MemoryPool> = OnceCell::new();

/// A memory pool for reusing stacks.
#[derive(educe::Educe)]
#[educe(Debug)]
pub struct MemoryPool {
#[educe(Debug(ignore))]
pool: UnsafeCell<BinaryHeap<PooledStack>>,
len: AtomicUsize,
//最小内存数,即核心内存数
min_size: AtomicUsize,
min_count: AtomicUsize,
//非核心内存的最大存活时间,单位ns
keep_alive_time: AtomicU64,
}

unsafe impl Send for StackPool {}
unsafe impl Send for MemoryPool {}

unsafe impl Sync for StackPool {}
unsafe impl Sync for MemoryPool {}

impl Default for StackPool {
impl Default for MemoryPool {
fn default() -> Self {
Self::new(0, 10_000_000_000)
}
}

impl StackPool {
impl MemoryPool {
/// Init the `MemoryPool`.
pub fn init(config: &Config) -> Result<(), MemoryPool> {
STACK_POOL.set(MemoryPool::new(
config.min_memory_count(),
config.memory_keep_alive_time(),
))
}

pub(crate) fn get_instance<'m>() -> &'m Self {
BeanFactory::get_or_default(STACK_POOL_BEAN)
STACK_POOL.get_or_init(MemoryPool::default)
}

pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
pub(crate) fn new(min_count: usize, keep_alive_time: u64) -> Self {
Self {
pool: UnsafeCell::new(BinaryHeap::default()),
len: AtomicUsize::default(),
min_size: AtomicUsize::new(min_size),
min_count: AtomicUsize::new(min_count),
keep_alive_time: AtomicU64::new(keep_alive_time),
}
}
Expand Down Expand Up @@ -194,7 +208,7 @@ impl StackPool {
stack.update_stack_teb_fields();
return Ok(stack);
}
if self.min_size() < self.len()
if self.min_count() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// clean the expired stack
Expand All @@ -221,13 +235,13 @@ impl StackPool {
}

#[allow(dead_code)]
pub(crate) fn set_min_size(&self, min_size: usize) {
self.min_size
.store(min_size, std::sync::atomic::Ordering::Release);
pub(crate) fn set_min_count(&self, min_count: usize) {
self.min_count
.store(min_count, std::sync::atomic::Ordering::Release);
}

pub(crate) fn min_size(&self) -> usize {
self.min_size.load(std::sync::atomic::Ordering::Acquire)
pub(crate) fn min_count(&self) -> usize {
self.min_count.load(std::sync::atomic::Ordering::Acquire)
}

pub(crate) fn len(&self) -> usize {
Expand Down Expand Up @@ -264,7 +278,7 @@ impl StackPool {
}
}
for stack in maybe_free {
if self.min_size() < self.len()
if self.min_count() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// free the stack
Expand All @@ -283,7 +297,7 @@ mod tests {

#[test]
fn test_stack_pool() -> std::io::Result<()> {
let pool = StackPool::default();
let pool = MemoryPool::default();
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
assert_eq!(Rc::strong_count(&stack.stack), 2);
drop(stack);
Expand Down
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
/// Common traits and impl.
pub mod common;

/// Configuration for `EventLoops`.
#[allow(missing_docs)]
pub mod config;

/// Coroutine impls.
pub mod coroutine;

Expand Down
4 changes: 2 additions & 2 deletions core/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN};
use crate::common::{get_timeout_time, now, CondvarBlocker};
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::coroutine::stack_pool::StackPool;
use crate::coroutine::stack_pool::MemoryPool;
use crate::scheduler::SchedulableSuspender;
use crate::{catch, error, impl_current_for, impl_display_by_debug, info};
use nix::sys::pthread::{pthread_kill, pthread_self, Pthread};
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Monitor {
);
}
}
StackPool::get_instance().clean();
MemoryPool::get_instance().clean();
//monitor线程不执行协程计算任务,每次循环至少wait 1ms
monitor.blocker.clone().block(Duration::from_millis(1));
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::Config;
use crate::coroutine::suspender::Suspender;
use crate::net::config::Config;
use crate::net::event_loop::EventLoop;
use crate::net::join::JoinHandle;
use crate::{error, info};
Expand Down Expand Up @@ -30,10 +30,6 @@ mod operator;
#[allow(missing_docs)]
pub mod event_loop;

/// Configuration for `EventLoops`.
#[allow(missing_docs)]
pub mod config;

/// Task join abstraction and impl.
pub mod join;

Expand Down
6 changes: 5 additions & 1 deletion hook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
use once_cell::sync::OnceCell;
use open_coroutine_core::co_pool::task::UserTaskFunc;
use open_coroutine_core::net::config::Config;
use open_coroutine_core::config::Config;
use open_coroutine_core::coroutine::stack_pool::MemoryPool;
use open_coroutine_core::net::join::JoinHandle;
use open_coroutine_core::net::{EventLoops, UserFunc};
use open_coroutine_core::scheduler::SchedulableCoroutine;
Expand All @@ -75,6 +76,9 @@ pub mod syscall;
/// Start the framework.
#[no_mangle]
pub extern "C" fn open_coroutine_init(config: Config) -> c_int {
if MemoryPool::init(&config).is_err() {
return -1;
}
EventLoops::init(&config);
_ = HOOK.get_or_init(|| config.hook());
0
Expand Down
Loading

0 comments on commit b228729

Please sign in to comment.