diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e13d861..9c3a7a04 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,9 +114,9 @@ jobs: - target: x86_64-pc-windows-gnu os: windows-latest - - target: i686-pc-windows-gnu - os: windows-latest +# - target: i686-pc-windows-gnu +# os: windows-latest - target: x86_64-pc-windows-msvc os: windows-latest - - target: i686-pc-windows-msvc - os: windows-latest +# - target: i686-pc-windows-msvc +# os: windows-latest diff --git a/core/src/common/constants.rs b/core/src/common/constants.rs index dc62031d..e8b0079b 100644 --- a/core/src/common/constants.rs +++ b/core/src/common/constants.rs @@ -15,6 +15,9 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean"; /// Task global queue bean name. pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean"; +/// Stack pool bean name. +pub const STACK_POOL_BEAN: &str = "stackPoolBean"; + /// Monitor bean name. pub const MONITOR_BEAN: &str = "monitorBean"; diff --git a/core/src/coroutine/korosensei.rs b/core/src/coroutine/korosensei.rs index 81004f3c..fdc5c861 100644 --- a/core/src/coroutine/korosensei.rs +++ b/core/src/coroutine/korosensei.rs @@ -2,9 +2,10 @@ use crate::catch; use crate::common::constants::CoroutineState; use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; +use crate::coroutine::stack_pool::{PooledStack, StackPool}; use crate::coroutine::suspender::Suspender; use crate::coroutine::StackInfo; -use corosensei::stack::{DefaultStack, Stack}; +use corosensei::stack::Stack; use corosensei::trap::TrapHandlerRegs; use corosensei::CoroutineResult; use std::cell::{Cell, RefCell}; @@ -26,7 +27,7 @@ cfg_if::cfg_if! { #[repr(C)] pub struct Coroutine<'c, Param, Yield, Return> { pub(crate) name: String, - inner: corosensei::Coroutine, DefaultStack>, + inner: corosensei::Coroutine, PooledStack>, pub(crate) state: Cell>, pub(crate) stack_size: usize, pub(crate) stack_infos: RefCell>, @@ -307,12 +308,13 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { stack_size: usize, callback: F, ) -> std::io::Result { + let stack_pool = StackPool::get_instance(); if let Some(co) = Self::current() { let remaining_stack = unsafe { co.remaining_stack() }; if remaining_stack >= red_zone { return Ok(callback()); } - return DefaultStack::new(stack_size).map(|stack| { + return stack_pool.allocate(stack_size).map(|stack| { co.stack_infos.borrow_mut().push_back(StackInfo { stack_top: stack.base().get(), stack_bottom: stack.limit().get(), @@ -335,7 +337,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { return Ok(callback()); } } - DefaultStack::new(stack_size).map(|stack| { + stack_pool.allocate(stack_size).map(|stack| { STACK_INFOS.with(|s| { s.borrow_mut().push_back(StackInfo { stack_top: stack.base().get(), @@ -378,7 +380,7 @@ where F: FnOnce(&Suspender, Param) -> Return + 'static, { let stack_size = stack_size.max(crate::common::page_size()); - let stack = DefaultStack::new(stack_size)?; + let stack = StackPool::get_instance().allocate(stack_size)?; let stack_infos = RefCell::new(VecDeque::from([StackInfo { stack_top: stack.base().get(), stack_bottom: stack.limit().get(), diff --git a/core/src/coroutine/mod.rs b/core/src/coroutine/mod.rs index 125f007e..c7711647 100644 --- a/core/src/coroutine/mod.rs +++ b/core/src/coroutine/mod.rs @@ -76,6 +76,8 @@ pub struct StackInfo { /// Coroutine state abstraction and impl. mod state; +pub(crate) mod stack_pool; + impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { /// Get the name of this coroutine. pub fn name(&self) -> &str { diff --git a/core/src/coroutine/stack_pool.rs b/core/src/coroutine/stack_pool.rs new file mode 100644 index 00000000..22a39fbd --- /dev/null +++ b/core/src/coroutine/stack_pool.rs @@ -0,0 +1,297 @@ +use crate::common::beans::BeanFactory; +use crate::common::constants::STACK_POOL_BEAN; +use crate::common::now; +use corosensei::stack::{DefaultStack, Stack, StackPointer}; +use std::cell::UnsafeCell; +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::ops::{Deref, DerefMut}; +use std::rc::Rc; +use std::sync::atomic::{AtomicU64, AtomicUsize}; + +pub(crate) struct PooledStack { + stack_size: usize, + stack: Rc>, + create_time: u64, +} + +impl Deref for PooledStack { + type Target = DefaultStack; + + fn deref(&self) -> &DefaultStack { + unsafe { + self.stack + .deref() + .get() + .as_ref() + .expect("PooledStack is not unique") + } + } +} + +impl DerefMut for PooledStack { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { + self.stack + .deref() + .get() + .as_mut() + .expect("PooledStack is not unique") + } + } +} + +impl Clone for PooledStack { + fn clone(&self) -> Self { + Self { + stack_size: self.stack_size, + stack: self.stack.clone(), + create_time: self.create_time, + } + } +} + +impl PartialEq for PooledStack { + fn eq(&self, other: &Self) -> bool { + Rc::strong_count(&other.stack).eq(&Rc::strong_count(&self.stack)) + } +} + +impl Eq for PooledStack {} + +impl PartialOrd for PooledStack { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PooledStack { + fn cmp(&self, other: &Self) -> Ordering { + // BinaryHeap defaults to a large top heap, but we need a small top heap + match Rc::strong_count(&other.stack).cmp(&Rc::strong_count(&self.stack)) { + Ordering::Less => Ordering::Less, + Ordering::Equal => match other.stack_size.cmp(&self.stack_size) { + Ordering::Less => Ordering::Less, + Ordering::Equal => other.create_time.cmp(&self.create_time), + Ordering::Greater => Ordering::Greater, + }, + Ordering::Greater => Ordering::Greater, + } + } +} + +unsafe impl Stack for PooledStack { + #[inline] + fn base(&self) -> StackPointer { + self.deref().base() + } + + #[inline] + fn limit(&self) -> StackPointer { + self.deref().limit() + } + + #[cfg(windows)] + #[inline] + fn teb_fields(&self) -> corosensei::stack::StackTebFields { + self.deref().teb_fields() + } + + #[cfg(windows)] + #[inline] + fn update_teb_fields(&mut self, stack_limit: usize, guaranteed_stack_bytes: usize) { + self.deref_mut() + .update_teb_fields(stack_limit, guaranteed_stack_bytes); + } +} + +impl PooledStack { + pub(crate) fn new(stack_size: usize) -> std::io::Result { + Ok(Self { + stack_size, + stack: Rc::new(UnsafeCell::new(DefaultStack::new(stack_size)?)), + create_time: now(), + }) + } + + /// This function must be called after a stack has finished running a coroutine + /// so that the `StackLimit` and `GuaranteedStackBytes` fields from the TEB can + /// be updated in the stack. This is necessary if the stack is reused for + /// another coroutine. + #[inline] + #[cfg(windows)] + pub(crate) fn update_stack_teb_fields(&mut self) { + cfg_if::cfg_if! { + if #[cfg(target_arch = "x86_64")] { + type StackWord = u64; + } else if #[cfg(target_arch = "x86")] { + type StackWord = u32; + } + } + let base = self.base().get() as *const StackWord; + unsafe { + let stack_limit = usize::try_from(*base.sub(1)).expect("stack limit overflow"); + let guaranteed_stack_bytes = + usize::try_from(*base.sub(2)).expect("guaranteed stack bytes overflow"); + self.update_teb_fields(stack_limit, guaranteed_stack_bytes); + } + } +} + +pub(crate) struct StackPool { + pool: UnsafeCell>, + len: AtomicUsize, + //最小内存数,即核心内存数 + min_size: AtomicUsize, + //非核心内存的最大存活时间,单位ns + keep_alive_time: AtomicU64, +} + +unsafe impl Send for StackPool {} + +unsafe impl Sync for StackPool {} + +impl Default for StackPool { + fn default() -> Self { + Self::new(0, 10_000_000_000) + } +} + +impl StackPool { + pub(crate) fn get_instance<'m>() -> &'m Self { + BeanFactory::get_or_default(STACK_POOL_BEAN) + } + + pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self { + Self { + pool: UnsafeCell::new(BinaryHeap::default()), + len: AtomicUsize::default(), + min_size: AtomicUsize::new(min_size), + keep_alive_time: AtomicU64::new(keep_alive_time), + } + } + + pub(crate) fn allocate(&self, stack_size: usize) -> std::io::Result { + let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") }; + // find min stack + let mut not_use = Vec::new(); + while let Some(stack) = heap.peek() { + if Rc::strong_count(&stack.stack) > 1 { + // can't use the stack + break; + } + #[allow(unused_mut)] + if let Some(mut stack) = heap.pop() { + self.sub_len(); + if stack_size <= stack.stack_size { + for s in not_use { + heap.push(s); + self.add_len(); + } + heap.push(stack.clone()); + self.add_len(); + #[cfg(windows)] + stack.update_stack_teb_fields(); + return Ok(stack); + } + if self.min_size() < self.len() + && now() <= stack.create_time.saturating_add(self.keep_alive_time()) + { + // clean the expired stack + continue; + } + not_use.push(stack); + } + } + let stack = PooledStack::new(stack_size)?; + heap.push(stack.clone()); + self.add_len(); + Ok(stack) + } + + #[allow(dead_code)] + pub(crate) fn set_keep_alive_time(&self, keep_alive_time: u64) { + self.keep_alive_time + .store(keep_alive_time, std::sync::atomic::Ordering::Release); + } + + pub(crate) fn keep_alive_time(&self) -> u64 { + self.keep_alive_time + .load(std::sync::atomic::Ordering::Acquire) + } + + #[allow(dead_code)] + pub(crate) fn set_min_size(&self, min_size: usize) { + self.min_size + .store(min_size, std::sync::atomic::Ordering::Release); + } + + pub(crate) fn min_size(&self) -> usize { + self.min_size.load(std::sync::atomic::Ordering::Acquire) + } + + pub(crate) fn len(&self) -> usize { + self.len.load(std::sync::atomic::Ordering::Acquire) + } + + fn add_len(&self) { + self.len.store( + self.len().saturating_add(1), + std::sync::atomic::Ordering::Release, + ); + } + + fn sub_len(&self) { + self.len.store( + self.len().saturating_sub(1), + std::sync::atomic::Ordering::Release, + ); + } + + /// Clean the expired stack. + #[allow(dead_code)] + pub(crate) fn clean(&self) { + let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") }; + let mut maybe_free = Vec::new(); + while let Some(stack) = heap.peek() { + if Rc::strong_count(&stack.stack) > 1 { + // can't free the stack + break; + } + if let Some(stack) = heap.pop() { + self.sub_len(); + maybe_free.push(stack); + } + } + for stack in maybe_free { + if self.min_size() < self.len() + && now() <= stack.create_time.saturating_add(self.keep_alive_time()) + { + // free the stack + continue; + } + heap.push(stack); + self.add_len(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::constants::DEFAULT_STACK_SIZE; + + #[test] + fn test_stack_pool() -> std::io::Result<()> { + let pool = StackPool::default(); + let stack = pool.allocate(DEFAULT_STACK_SIZE)?; + assert_eq!(Rc::strong_count(&stack.stack), 2); + drop(stack); + let stack = pool.allocate(DEFAULT_STACK_SIZE)?; + assert_eq!(Rc::strong_count(&stack.stack), 2); + assert_eq!(pool.len(), 1); + _ = pool.allocate(DEFAULT_STACK_SIZE)?; + assert_eq!(pool.len(), 2); + Ok(()) + } +} diff --git a/core/src/monitor.rs b/core/src/monitor.rs index 855ac47b..2da07782 100644 --- a/core/src/monitor.rs +++ b/core/src/monitor.rs @@ -3,6 +3,7 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN}; use crate::common::{get_timeout_time, now, CondvarBlocker}; use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; +use crate::coroutine::stack_pool::StackPool; use crate::scheduler::SchedulableSuspender; use crate::{catch, error, impl_current_for, impl_display_by_debug, info}; use nix::sys::pthread::{pthread_kill, pthread_self, Pthread}; @@ -136,6 +137,7 @@ impl Monitor { ); } } + StackPool::get_instance().clean(); //monitor线程不执行协程计算任务,每次循环至少wait 1ms monitor.blocker.clone().block(Duration::from_millis(1)); }