diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9c3a7a04..dd1f3719 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -82,9 +82,9 @@ jobs:
aarch64-apple-darwin,
x86_64-pc-windows-gnu,
- i686-pc-windows-gnu,
+# i686-pc-windows-gnu,
x86_64-pc-windows-msvc,
- i686-pc-windows-msvc,
+# i686-pc-windows-msvc,
]
channel: [ 1.81.0, nightly-2024-08-02 ]
include:
diff --git a/core/src/common/constants.rs b/core/src/common/constants.rs
index e8b0079b..dc62031d 100644
--- a/core/src/common/constants.rs
+++ b/core/src/common/constants.rs
@@ -15,9 +15,6 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean";
/// Task global queue bean name.
pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean";
-/// Stack pool bean name.
-pub const STACK_POOL_BEAN: &str = "stackPoolBean";
-
/// Monitor bean name.
pub const MONITOR_BEAN: &str = "monitorBean";
diff --git a/core/src/net/config.rs b/core/src/config.rs
similarity index 70%
rename from core/src/net/config.rs
rename to core/src/config.rs
index 0efc0839..eb3c8e75 100644
--- a/core/src/net/config.rs
+++ b/core/src/config.rs
@@ -8,15 +8,18 @@ pub struct Config {
min_size: usize,
max_size: usize,
keep_alive_time: u64,
+ min_memory_count: usize,
+ memory_keep_alive_time: u64,
hook: bool,
}
impl Config {
#[must_use]
pub fn single() -> Self {
- Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, true)
+ Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, 0, 10_000_000_000, true)
}
+ #[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
event_loop_size: usize,
@@ -24,6 +27,8 @@ impl Config {
min_size: usize,
max_size: usize,
keep_alive_time: u64,
+ min_memory_count: usize,
+ memory_keep_alive_time: u64,
hook: bool,
) -> Self {
Self {
@@ -32,6 +37,8 @@ impl Config {
min_size,
max_size,
keep_alive_time,
+ min_memory_count,
+ memory_keep_alive_time,
hook,
}
}
@@ -61,6 +68,16 @@ impl Config {
self.keep_alive_time
}
+ #[must_use]
+ pub fn min_memory_count(&self) -> usize {
+ self.min_memory_count
+ }
+
+ #[must_use]
+ pub fn memory_keep_alive_time(&self) -> u64 {
+ self.memory_keep_alive_time
+ }
+
#[must_use]
pub fn hook(&self) -> bool {
self.hook
@@ -101,6 +118,16 @@ impl Config {
self
}
+ pub fn set_min_memory_count(&mut self, min_memory_count: usize) -> &mut Self {
+ self.min_memory_count = min_memory_count;
+ self
+ }
+
+ pub fn set_memory_keep_alive_time(&mut self, memory_keep_alive_time: u64) -> &mut Self {
+ self.memory_keep_alive_time = memory_keep_alive_time;
+ self
+ }
+
pub fn set_hook(&mut self, hook: bool) -> &mut Self {
self.hook = hook;
self
@@ -109,6 +136,15 @@ impl Config {
impl Default for Config {
fn default() -> Self {
- Self::new(cpu_count(), DEFAULT_STACK_SIZE, 0, 65536, 0, true)
+ Self::new(
+ cpu_count(),
+ DEFAULT_STACK_SIZE,
+ 0,
+ 65536,
+ 0,
+ 0,
+ 10_000_000_000,
+ true,
+ )
}
}
diff --git a/core/src/coroutine/korosensei.rs b/core/src/coroutine/korosensei.rs
index fdc5c861..45381406 100644
--- a/core/src/coroutine/korosensei.rs
+++ b/core/src/coroutine/korosensei.rs
@@ -2,7 +2,7 @@ use crate::catch;
use crate::common::constants::CoroutineState;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
-use crate::coroutine::stack_pool::{PooledStack, StackPool};
+use crate::coroutine::stack_pool::{MemoryPool, PooledStack};
use crate::coroutine::suspender::Suspender;
use crate::coroutine::StackInfo;
use corosensei::stack::Stack;
@@ -29,7 +29,6 @@ pub struct Coroutine<'c, Param, Yield, Return> {
pub(crate) name: String,
inner: corosensei::Coroutine, PooledStack>,
pub(crate) state: Cell>,
- pub(crate) stack_size: usize,
pub(crate) stack_infos: RefCell>,
pub(crate) listeners: VecDeque<&'c dyn Listener>,
pub(crate) local: CoroutineLocal<'c>,
@@ -308,7 +307,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
stack_size: usize,
callback: F,
) -> std::io::Result {
- let stack_pool = StackPool::get_instance();
+ let stack_pool = MemoryPool::get_instance();
if let Some(co) = Self::current() {
let remaining_stack = unsafe { co.remaining_stack() };
if remaining_stack >= red_zone {
@@ -380,7 +379,7 @@ where
F: FnOnce(&Suspender, Param) -> Return + 'static,
{
let stack_size = stack_size.max(crate::common::page_size());
- let stack = StackPool::get_instance().allocate(stack_size)?;
+ let stack = MemoryPool::get_instance().allocate(stack_size)?;
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
@@ -403,7 +402,6 @@ where
let mut co = Coroutine {
name,
inner,
- stack_size,
stack_infos,
state: Cell::new(CoroutineState::Ready),
listeners: VecDeque::default(),
diff --git a/core/src/coroutine/mod.rs b/core/src/coroutine/mod.rs
index c7711647..fcc6abc8 100644
--- a/core/src/coroutine/mod.rs
+++ b/core/src/coroutine/mod.rs
@@ -1,4 +1,5 @@
use crate::common::constants::CoroutineState;
+use crate::common::ordered_work_steal::Ordered;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::{impl_current_for, impl_display_by_debug, impl_for_named};
@@ -16,10 +17,11 @@ pub mod local;
/// Coroutine listener abstraction and impl.
pub mod listener;
-use crate::common::ordered_work_steal::Ordered;
+/// Reuse stacks.
+pub mod stack_pool;
+
#[cfg(feature = "korosensei")]
pub use korosensei::Coroutine;
-
#[cfg(feature = "korosensei")]
mod korosensei;
@@ -76,8 +78,6 @@ pub struct StackInfo {
/// Coroutine state abstraction and impl.
mod state;
-pub(crate) mod stack_pool;
-
impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
/// Get the name of this coroutine.
pub fn name(&self) -> &str {
@@ -201,8 +201,10 @@ where
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Coroutine")
.field("name", &self.name())
- .field("status", &self.state())
+ .field("state", &self.state())
+ .field("stack_infos", &self.stack_infos)
.field("local", &self.local)
+ .field("priority", &self.priority)
.finish()
}
}
diff --git a/core/src/coroutine/stack_pool.rs b/core/src/coroutine/stack_pool.rs
index 22a39fbd..a756e0c8 100644
--- a/core/src/coroutine/stack_pool.rs
+++ b/core/src/coroutine/stack_pool.rs
@@ -1,7 +1,7 @@
-use crate::common::beans::BeanFactory;
-use crate::common::constants::STACK_POOL_BEAN;
use crate::common::now;
+use crate::config::Config;
use corosensei::stack::{DefaultStack, Stack, StackPointer};
+use once_cell::sync::OnceCell;
use std::cell::UnsafeCell;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
@@ -138,35 +138,49 @@ impl PooledStack {
}
}
-pub(crate) struct StackPool {
+static STACK_POOL: OnceCell = OnceCell::new();
+
+/// A memory pool for reusing stacks.
+#[derive(educe::Educe)]
+#[educe(Debug)]
+pub struct MemoryPool {
+ #[educe(Debug(ignore))]
pool: UnsafeCell>,
len: AtomicUsize,
//最小内存数,即核心内存数
- min_size: AtomicUsize,
+ min_count: AtomicUsize,
//非核心内存的最大存活时间,单位ns
keep_alive_time: AtomicU64,
}
-unsafe impl Send for StackPool {}
+unsafe impl Send for MemoryPool {}
-unsafe impl Sync for StackPool {}
+unsafe impl Sync for MemoryPool {}
-impl Default for StackPool {
+impl Default for MemoryPool {
fn default() -> Self {
Self::new(0, 10_000_000_000)
}
}
-impl StackPool {
+impl MemoryPool {
+ /// Init the `MemoryPool`.
+ pub fn init(config: &Config) -> Result<(), MemoryPool> {
+ STACK_POOL.set(MemoryPool::new(
+ config.min_memory_count(),
+ config.memory_keep_alive_time(),
+ ))
+ }
+
pub(crate) fn get_instance<'m>() -> &'m Self {
- BeanFactory::get_or_default(STACK_POOL_BEAN)
+ STACK_POOL.get_or_init(MemoryPool::default)
}
- pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
+ pub(crate) fn new(min_count: usize, keep_alive_time: u64) -> Self {
Self {
pool: UnsafeCell::new(BinaryHeap::default()),
len: AtomicUsize::default(),
- min_size: AtomicUsize::new(min_size),
+ min_count: AtomicUsize::new(min_count),
keep_alive_time: AtomicU64::new(keep_alive_time),
}
}
@@ -194,7 +208,7 @@ impl StackPool {
stack.update_stack_teb_fields();
return Ok(stack);
}
- if self.min_size() < self.len()
+ if self.min_count() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// clean the expired stack
@@ -221,13 +235,13 @@ impl StackPool {
}
#[allow(dead_code)]
- pub(crate) fn set_min_size(&self, min_size: usize) {
- self.min_size
- .store(min_size, std::sync::atomic::Ordering::Release);
+ pub(crate) fn set_min_count(&self, min_count: usize) {
+ self.min_count
+ .store(min_count, std::sync::atomic::Ordering::Release);
}
- pub(crate) fn min_size(&self) -> usize {
- self.min_size.load(std::sync::atomic::Ordering::Acquire)
+ pub(crate) fn min_count(&self) -> usize {
+ self.min_count.load(std::sync::atomic::Ordering::Acquire)
}
pub(crate) fn len(&self) -> usize {
@@ -264,7 +278,7 @@ impl StackPool {
}
}
for stack in maybe_free {
- if self.min_size() < self.len()
+ if self.min_count() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// free the stack
@@ -283,7 +297,7 @@ mod tests {
#[test]
fn test_stack_pool() -> std::io::Result<()> {
- let pool = StackPool::default();
+ let pool = MemoryPool::default();
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
assert_eq!(Rc::strong_count(&stack.stack), 2);
drop(stack);
diff --git a/core/src/lib.rs b/core/src/lib.rs
index e7c01de6..0606950e 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -56,6 +56,10 @@
/// Common traits and impl.
pub mod common;
+/// Configuration for `EventLoops`.
+#[allow(missing_docs)]
+pub mod config;
+
/// Coroutine impls.
pub mod coroutine;
diff --git a/core/src/monitor.rs b/core/src/monitor.rs
index 2da07782..e7d0c09b 100644
--- a/core/src/monitor.rs
+++ b/core/src/monitor.rs
@@ -3,7 +3,7 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN};
use crate::common::{get_timeout_time, now, CondvarBlocker};
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
-use crate::coroutine::stack_pool::StackPool;
+use crate::coroutine::stack_pool::MemoryPool;
use crate::scheduler::SchedulableSuspender;
use crate::{catch, error, impl_current_for, impl_display_by_debug, info};
use nix::sys::pthread::{pthread_kill, pthread_self, Pthread};
@@ -137,7 +137,7 @@ impl Monitor {
);
}
}
- StackPool::get_instance().clean();
+ MemoryPool::get_instance().clean();
//monitor线程不执行协程计算任务,每次循环至少wait 1ms
monitor.blocker.clone().block(Duration::from_millis(1));
}
diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs
index 9bbcc28a..cec6b070 100644
--- a/core/src/net/mod.rs
+++ b/core/src/net/mod.rs
@@ -1,5 +1,5 @@
+use crate::config::Config;
use crate::coroutine::suspender::Suspender;
-use crate::net::config::Config;
use crate::net::event_loop::EventLoop;
use crate::net::join::JoinHandle;
use crate::{error, info};
@@ -30,10 +30,6 @@ mod operator;
#[allow(missing_docs)]
pub mod event_loop;
-/// Configuration for `EventLoops`.
-#[allow(missing_docs)]
-pub mod config;
-
/// Task join abstraction and impl.
pub mod join;
diff --git a/hook/src/lib.rs b/hook/src/lib.rs
index b1701189..f387dec8 100644
--- a/hook/src/lib.rs
+++ b/hook/src/lib.rs
@@ -49,7 +49,8 @@
use once_cell::sync::OnceCell;
use open_coroutine_core::co_pool::task::UserTaskFunc;
-use open_coroutine_core::net::config::Config;
+use open_coroutine_core::config::Config;
+use open_coroutine_core::coroutine::stack_pool::MemoryPool;
use open_coroutine_core::net::join::JoinHandle;
use open_coroutine_core::net::{EventLoops, UserFunc};
use open_coroutine_core::scheduler::SchedulableCoroutine;
@@ -75,6 +76,9 @@ pub mod syscall;
/// Start the framework.
#[no_mangle]
pub extern "C" fn open_coroutine_init(config: Config) -> c_int {
+ if MemoryPool::init(&config).is_err() {
+ return -1;
+ }
EventLoops::init(&config);
_ = HOOK.get_or_init(|| config.hook());
0
diff --git a/macros/src/lib.rs b/macros/src/lib.rs
index 8b623975..6617c132 100644
--- a/macros/src/lib.rs
+++ b/macros/src/lib.rs
@@ -59,6 +59,8 @@ pub fn main(args: TokenStream, func: TokenStream) -> TokenStream {
let mut min_size = usize::MAX;
let mut max_size = usize::MAX;
let mut keep_alive_time = u64::MAX;
+ let mut min_memory_count = usize::MAX;
+ let mut memory_keep_alive_time = u64::MAX;
let mut hook = true;
if !args.is_empty() {
let tea_parser = syn::meta::parser(|meta| {
@@ -72,6 +74,10 @@ pub fn main(args: TokenStream, func: TokenStream) -> TokenStream {
max_size = meta.value()?.parse::()?.base10_parse()?;
} else if meta.path.is_ident("keep_alive_time") {
keep_alive_time = meta.value()?.parse::()?.base10_parse()?;
+ } else if meta.path.is_ident("min_memory_count") {
+ min_memory_count = meta.value()?.parse::()?.base10_parse()?;
+ } else if meta.path.is_ident("memory_keep_alive_time") {
+ memory_keep_alive_time = meta.value()?.parse::()?.base10_parse()?;
} else if meta.path.is_ident("hook") {
hook = meta.value()?.parse::()?.value();
}
@@ -109,6 +115,12 @@ pub fn main(args: TokenStream, func: TokenStream) -> TokenStream {
if #keep_alive_time != u64::MAX {
open_coroutine_config.set_keep_alive_time(#keep_alive_time);
}
+ if #min_memory_count != usize::MAX {
+ open_coroutine_config.set_min_memory_count(#min_memory_count);
+ }
+ if #memory_keep_alive_time != u64::MAX {
+ open_coroutine_config.set_memory_keep_alive_time(#memory_keep_alive_time);
+ }
if #hook != true {
open_coroutine_config.set_hook(#hook);
}
diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs
index 31549929..d8b30876 100644
--- a/open-coroutine/src/lib.rs
+++ b/open-coroutine/src/lib.rs
@@ -49,7 +49,7 @@
use open_coroutine_core::co_pool::task::UserTaskFunc;
use open_coroutine_core::common::constants::SLICE;
-pub use open_coroutine_core::net::config::Config;
+pub use open_coroutine_core::config::Config;
use open_coroutine_core::net::UserFunc;
pub use open_coroutine_macros::*;
use std::cmp::Ordering;
@@ -322,7 +322,7 @@ pub fn connect_timeout(addr: A, timeout: Duration) -> std::io:
#[cfg(test)]
mod tests {
use crate::{init, shutdown};
- use open_coroutine_core::net::config::Config;
+ use open_coroutine_core::config::Config;
#[test]
fn test() {