From ac7552649a6a2ed15f6e066d3252564c550377e6 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Tue, 24 Dec 2024 10:18:53 +0800 Subject: [PATCH] support more config --- .github/workflows/ci.yml | 4 +-- core/src/common/constants.rs | 3 -- core/src/{net => }/config.rs | 40 ++++++++++++++++++++++-- core/src/coroutine/korosensei.rs | 8 ++--- core/src/coroutine/mod.rs | 12 +++++--- core/src/coroutine/stack_pool.rs | 52 ++++++++++++++++++++------------ core/src/lib.rs | 4 +++ core/src/monitor.rs | 4 +-- core/src/net/mod.rs | 6 +--- hook/src/lib.rs | 6 +++- macros/src/lib.rs | 12 ++++++++ open-coroutine/src/lib.rs | 4 +-- 12 files changed, 109 insertions(+), 46 deletions(-) rename core/src/{net => }/config.rs (70%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9c3a7a04..dd1f3719 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,9 +82,9 @@ jobs: aarch64-apple-darwin, x86_64-pc-windows-gnu, - i686-pc-windows-gnu, +# i686-pc-windows-gnu, x86_64-pc-windows-msvc, - i686-pc-windows-msvc, +# i686-pc-windows-msvc, ] channel: [ 1.81.0, nightly-2024-08-02 ] include: diff --git a/core/src/common/constants.rs b/core/src/common/constants.rs index e8b0079b..dc62031d 100644 --- a/core/src/common/constants.rs +++ b/core/src/common/constants.rs @@ -15,9 +15,6 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean"; /// Task global queue bean name. pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean"; -/// Stack pool bean name. -pub const STACK_POOL_BEAN: &str = "stackPoolBean"; - /// Monitor bean name. pub const MONITOR_BEAN: &str = "monitorBean"; diff --git a/core/src/net/config.rs b/core/src/config.rs similarity index 70% rename from core/src/net/config.rs rename to core/src/config.rs index 0efc0839..eb3c8e75 100644 --- a/core/src/net/config.rs +++ b/core/src/config.rs @@ -8,15 +8,18 @@ pub struct Config { min_size: usize, max_size: usize, keep_alive_time: u64, + min_memory_count: usize, + memory_keep_alive_time: u64, hook: bool, } impl Config { #[must_use] pub fn single() -> Self { - Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, true) + Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, 0, 10_000_000_000, true) } + #[allow(clippy::too_many_arguments)] #[must_use] pub fn new( event_loop_size: usize, @@ -24,6 +27,8 @@ impl Config { min_size: usize, max_size: usize, keep_alive_time: u64, + min_memory_count: usize, + memory_keep_alive_time: u64, hook: bool, ) -> Self { Self { @@ -32,6 +37,8 @@ impl Config { min_size, max_size, keep_alive_time, + min_memory_count, + memory_keep_alive_time, hook, } } @@ -61,6 +68,16 @@ impl Config { self.keep_alive_time } + #[must_use] + pub fn min_memory_count(&self) -> usize { + self.min_memory_count + } + + #[must_use] + pub fn memory_keep_alive_time(&self) -> u64 { + self.memory_keep_alive_time + } + #[must_use] pub fn hook(&self) -> bool { self.hook @@ -101,6 +118,16 @@ impl Config { self } + pub fn set_min_memory_count(&mut self, min_memory_count: usize) -> &mut Self { + self.min_memory_count = min_memory_count; + self + } + + pub fn set_memory_keep_alive_time(&mut self, memory_keep_alive_time: u64) -> &mut Self { + self.memory_keep_alive_time = memory_keep_alive_time; + self + } + pub fn set_hook(&mut self, hook: bool) -> &mut Self { self.hook = hook; self @@ -109,6 +136,15 @@ impl Config { impl Default for Config { fn default() -> Self { - Self::new(cpu_count(), DEFAULT_STACK_SIZE, 0, 65536, 0, true) + Self::new( + cpu_count(), + DEFAULT_STACK_SIZE, + 0, + 65536, + 0, + 0, + 10_000_000_000, + true, + ) } } diff --git a/core/src/coroutine/korosensei.rs b/core/src/coroutine/korosensei.rs index fdc5c861..45381406 100644 --- a/core/src/coroutine/korosensei.rs +++ b/core/src/coroutine/korosensei.rs @@ -2,7 +2,7 @@ use crate::catch; use crate::common::constants::CoroutineState; use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; -use crate::coroutine::stack_pool::{PooledStack, StackPool}; +use crate::coroutine::stack_pool::{MemoryPool, PooledStack}; use crate::coroutine::suspender::Suspender; use crate::coroutine::StackInfo; use corosensei::stack::Stack; @@ -29,7 +29,6 @@ pub struct Coroutine<'c, Param, Yield, Return> { pub(crate) name: String, inner: corosensei::Coroutine, PooledStack>, pub(crate) state: Cell>, - pub(crate) stack_size: usize, pub(crate) stack_infos: RefCell>, pub(crate) listeners: VecDeque<&'c dyn Listener>, pub(crate) local: CoroutineLocal<'c>, @@ -308,7 +307,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { stack_size: usize, callback: F, ) -> std::io::Result { - let stack_pool = StackPool::get_instance(); + let stack_pool = MemoryPool::get_instance(); if let Some(co) = Self::current() { let remaining_stack = unsafe { co.remaining_stack() }; if remaining_stack >= red_zone { @@ -380,7 +379,7 @@ where F: FnOnce(&Suspender, Param) -> Return + 'static, { let stack_size = stack_size.max(crate::common::page_size()); - let stack = StackPool::get_instance().allocate(stack_size)?; + let stack = MemoryPool::get_instance().allocate(stack_size)?; let stack_infos = RefCell::new(VecDeque::from([StackInfo { stack_top: stack.base().get(), stack_bottom: stack.limit().get(), @@ -403,7 +402,6 @@ where let mut co = Coroutine { name, inner, - stack_size, stack_infos, state: Cell::new(CoroutineState::Ready), listeners: VecDeque::default(), diff --git a/core/src/coroutine/mod.rs b/core/src/coroutine/mod.rs index c7711647..fcc6abc8 100644 --- a/core/src/coroutine/mod.rs +++ b/core/src/coroutine/mod.rs @@ -1,4 +1,5 @@ use crate::common::constants::CoroutineState; +use crate::common::ordered_work_steal::Ordered; use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; use crate::{impl_current_for, impl_display_by_debug, impl_for_named}; @@ -16,10 +17,11 @@ pub mod local; /// Coroutine listener abstraction and impl. pub mod listener; -use crate::common::ordered_work_steal::Ordered; +/// Reuse stacks. +pub mod stack_pool; + #[cfg(feature = "korosensei")] pub use korosensei::Coroutine; - #[cfg(feature = "korosensei")] mod korosensei; @@ -76,8 +78,6 @@ pub struct StackInfo { /// Coroutine state abstraction and impl. mod state; -pub(crate) mod stack_pool; - impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { /// Get the name of this coroutine. pub fn name(&self) -> &str { @@ -201,8 +201,10 @@ where fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Coroutine") .field("name", &self.name()) - .field("status", &self.state()) + .field("state", &self.state()) + .field("stack_infos", &self.stack_infos) .field("local", &self.local) + .field("priority", &self.priority) .finish() } } diff --git a/core/src/coroutine/stack_pool.rs b/core/src/coroutine/stack_pool.rs index 22a39fbd..a756e0c8 100644 --- a/core/src/coroutine/stack_pool.rs +++ b/core/src/coroutine/stack_pool.rs @@ -1,7 +1,7 @@ -use crate::common::beans::BeanFactory; -use crate::common::constants::STACK_POOL_BEAN; use crate::common::now; +use crate::config::Config; use corosensei::stack::{DefaultStack, Stack, StackPointer}; +use once_cell::sync::OnceCell; use std::cell::UnsafeCell; use std::cmp::Ordering; use std::collections::BinaryHeap; @@ -138,35 +138,49 @@ impl PooledStack { } } -pub(crate) struct StackPool { +static STACK_POOL: OnceCell = OnceCell::new(); + +/// A memory pool for reusing stacks. +#[derive(educe::Educe)] +#[educe(Debug)] +pub struct MemoryPool { + #[educe(Debug(ignore))] pool: UnsafeCell>, len: AtomicUsize, //最小内存数,即核心内存数 - min_size: AtomicUsize, + min_count: AtomicUsize, //非核心内存的最大存活时间,单位ns keep_alive_time: AtomicU64, } -unsafe impl Send for StackPool {} +unsafe impl Send for MemoryPool {} -unsafe impl Sync for StackPool {} +unsafe impl Sync for MemoryPool {} -impl Default for StackPool { +impl Default for MemoryPool { fn default() -> Self { Self::new(0, 10_000_000_000) } } -impl StackPool { +impl MemoryPool { + /// Init the `MemoryPool`. + pub fn init(config: &Config) -> Result<(), MemoryPool> { + STACK_POOL.set(MemoryPool::new( + config.min_memory_count(), + config.memory_keep_alive_time(), + )) + } + pub(crate) fn get_instance<'m>() -> &'m Self { - BeanFactory::get_or_default(STACK_POOL_BEAN) + STACK_POOL.get_or_init(MemoryPool::default) } - pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self { + pub(crate) fn new(min_count: usize, keep_alive_time: u64) -> Self { Self { pool: UnsafeCell::new(BinaryHeap::default()), len: AtomicUsize::default(), - min_size: AtomicUsize::new(min_size), + min_count: AtomicUsize::new(min_count), keep_alive_time: AtomicU64::new(keep_alive_time), } } @@ -194,7 +208,7 @@ impl StackPool { stack.update_stack_teb_fields(); return Ok(stack); } - if self.min_size() < self.len() + if self.min_count() < self.len() && now() <= stack.create_time.saturating_add(self.keep_alive_time()) { // clean the expired stack @@ -221,13 +235,13 @@ impl StackPool { } #[allow(dead_code)] - pub(crate) fn set_min_size(&self, min_size: usize) { - self.min_size - .store(min_size, std::sync::atomic::Ordering::Release); + pub(crate) fn set_min_count(&self, min_count: usize) { + self.min_count + .store(min_count, std::sync::atomic::Ordering::Release); } - pub(crate) fn min_size(&self) -> usize { - self.min_size.load(std::sync::atomic::Ordering::Acquire) + pub(crate) fn min_count(&self) -> usize { + self.min_count.load(std::sync::atomic::Ordering::Acquire) } pub(crate) fn len(&self) -> usize { @@ -264,7 +278,7 @@ impl StackPool { } } for stack in maybe_free { - if self.min_size() < self.len() + if self.min_count() < self.len() && now() <= stack.create_time.saturating_add(self.keep_alive_time()) { // free the stack @@ -283,7 +297,7 @@ mod tests { #[test] fn test_stack_pool() -> std::io::Result<()> { - let pool = StackPool::default(); + let pool = MemoryPool::default(); let stack = pool.allocate(DEFAULT_STACK_SIZE)?; assert_eq!(Rc::strong_count(&stack.stack), 2); drop(stack); diff --git a/core/src/lib.rs b/core/src/lib.rs index e7c01de6..0606950e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -56,6 +56,10 @@ /// Common traits and impl. pub mod common; +/// Configuration for `EventLoops`. +#[allow(missing_docs)] +pub mod config; + /// Coroutine impls. pub mod coroutine; diff --git a/core/src/monitor.rs b/core/src/monitor.rs index 2da07782..e7d0c09b 100644 --- a/core/src/monitor.rs +++ b/core/src/monitor.rs @@ -3,7 +3,7 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN}; use crate::common::{get_timeout_time, now, CondvarBlocker}; use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; -use crate::coroutine::stack_pool::StackPool; +use crate::coroutine::stack_pool::MemoryPool; use crate::scheduler::SchedulableSuspender; use crate::{catch, error, impl_current_for, impl_display_by_debug, info}; use nix::sys::pthread::{pthread_kill, pthread_self, Pthread}; @@ -137,7 +137,7 @@ impl Monitor { ); } } - StackPool::get_instance().clean(); + MemoryPool::get_instance().clean(); //monitor线程不执行协程计算任务,每次循环至少wait 1ms monitor.blocker.clone().block(Duration::from_millis(1)); } diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index 9bbcc28a..cec6b070 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -1,5 +1,5 @@ +use crate::config::Config; use crate::coroutine::suspender::Suspender; -use crate::net::config::Config; use crate::net::event_loop::EventLoop; use crate::net::join::JoinHandle; use crate::{error, info}; @@ -30,10 +30,6 @@ mod operator; #[allow(missing_docs)] pub mod event_loop; -/// Configuration for `EventLoops`. -#[allow(missing_docs)] -pub mod config; - /// Task join abstraction and impl. pub mod join; diff --git a/hook/src/lib.rs b/hook/src/lib.rs index b1701189..f387dec8 100644 --- a/hook/src/lib.rs +++ b/hook/src/lib.rs @@ -49,7 +49,8 @@ use once_cell::sync::OnceCell; use open_coroutine_core::co_pool::task::UserTaskFunc; -use open_coroutine_core::net::config::Config; +use open_coroutine_core::config::Config; +use open_coroutine_core::coroutine::stack_pool::MemoryPool; use open_coroutine_core::net::join::JoinHandle; use open_coroutine_core::net::{EventLoops, UserFunc}; use open_coroutine_core::scheduler::SchedulableCoroutine; @@ -75,6 +76,9 @@ pub mod syscall; /// Start the framework. #[no_mangle] pub extern "C" fn open_coroutine_init(config: Config) -> c_int { + if MemoryPool::init(&config).is_err() { + return -1; + } EventLoops::init(&config); _ = HOOK.get_or_init(|| config.hook()); 0 diff --git a/macros/src/lib.rs b/macros/src/lib.rs index 8b623975..6617c132 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -59,6 +59,8 @@ pub fn main(args: TokenStream, func: TokenStream) -> TokenStream { let mut min_size = usize::MAX; let mut max_size = usize::MAX; let mut keep_alive_time = u64::MAX; + let mut min_memory_count = usize::MAX; + let mut memory_keep_alive_time = u64::MAX; let mut hook = true; if !args.is_empty() { let tea_parser = syn::meta::parser(|meta| { @@ -72,6 +74,10 @@ pub fn main(args: TokenStream, func: TokenStream) -> TokenStream { max_size = meta.value()?.parse::()?.base10_parse()?; } else if meta.path.is_ident("keep_alive_time") { keep_alive_time = meta.value()?.parse::()?.base10_parse()?; + } else if meta.path.is_ident("min_memory_count") { + min_memory_count = meta.value()?.parse::()?.base10_parse()?; + } else if meta.path.is_ident("memory_keep_alive_time") { + memory_keep_alive_time = meta.value()?.parse::()?.base10_parse()?; } else if meta.path.is_ident("hook") { hook = meta.value()?.parse::()?.value(); } @@ -109,6 +115,12 @@ pub fn main(args: TokenStream, func: TokenStream) -> TokenStream { if #keep_alive_time != u64::MAX { open_coroutine_config.set_keep_alive_time(#keep_alive_time); } + if #min_memory_count != usize::MAX { + open_coroutine_config.set_min_memory_count(#min_memory_count); + } + if #memory_keep_alive_time != u64::MAX { + open_coroutine_config.set_memory_keep_alive_time(#memory_keep_alive_time); + } if #hook != true { open_coroutine_config.set_hook(#hook); } diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs index 31549929..d8b30876 100644 --- a/open-coroutine/src/lib.rs +++ b/open-coroutine/src/lib.rs @@ -49,7 +49,7 @@ use open_coroutine_core::co_pool::task::UserTaskFunc; use open_coroutine_core::common::constants::SLICE; -pub use open_coroutine_core::net::config::Config; +pub use open_coroutine_core::config::Config; use open_coroutine_core::net::UserFunc; pub use open_coroutine_macros::*; use std::cmp::Ordering; @@ -322,7 +322,7 @@ pub fn connect_timeout(addr: A, timeout: Duration) -> std::io: #[cfg(test)] mod tests { use crate::{init, shutdown}; - use open_coroutine_core::net::config::Config; + use open_coroutine_core::config::Config; #[test] fn test() {