diff --git a/crates/rspack_core/src/utils/task_loop.rs b/crates/rspack_core/src/utils/task_loop.rs index 3b9b0aa1629..a1dffd518a5 100644 --- a/crates/rspack_core/src/utils/task_loop.rs +++ b/crates/rspack_core/src/utils/task_loop.rs @@ -71,14 +71,34 @@ pub fn run_task_loop_with_event( // mark whether the task loop has been returned // the async task should not call `tx.send` after this mark to true let is_expected_shutdown: Arc = Arc::new(AtomicBool::new(false)); - let mut queue: VecDeque>> = VecDeque::from(init_tasks); + let (async_task, sync_task): (Vec<_>, Vec<_>) = init_tasks + .into_iter() + .partition(|x| matches!(x.get_task_type(), TaskType::Async)); + // FIXME: we should create a TaskQueue struct to abstract sync_task | async_task schedule later + let mut sync_queue: VecDeque>> = VecDeque::from(sync_task); + let mut async_queue: VecDeque>> = VecDeque::from(async_task); let mut active_task_count = 0; tokio::task::block_in_place(|| loop { - let task = queue.pop_front(); + // clear async_loop first, since async_task can run parallel + while let Some(task) = async_queue.pop_front() { + let tx = tx.clone(); + let is_expected_shutdown = is_expected_shutdown.clone(); + active_task_count += 1; + // safe expect here since spawn always return Ok + tokio::task::Builder::new() + .name(task.name()) + .spawn(async move { + let r = task.async_run().await; + if !is_expected_shutdown.load(Ordering::Relaxed) { + tx.send(r).expect("failed to send error message"); + } + }) + .expect("spawn task failed"); + } + let task = sync_queue.pop_front(); if task.is_none() && active_task_count == 0 { return Ok(()); } - if let Some(task) = task { let task = before_task_run(ctx, task); match task.get_task_type() { @@ -100,7 +120,13 @@ pub fn run_task_loop_with_event( TaskType::Sync => { // merge sync task result directly match task.sync_run(ctx) { - Ok(r) => queue.extend(r), + Ok(r) => { + let (async_task, sync_task): (Vec<_>, Vec<_>) = r + .into_iter() + .partition(|x| matches!(x.get_task_type(), TaskType::Async)); + sync_queue.extend(sync_task); + async_queue.extend(async_task); + } Err(e) => { is_expected_shutdown.store(true, Ordering::Relaxed); return Err(e); @@ -110,7 +136,7 @@ pub fn run_task_loop_with_event( } } - let data = if queue.is_empty() && active_task_count != 0 { + let data = if sync_queue.is_empty() && async_queue.is_empty() && active_task_count != 0 { Handle::current().block_on(async { let res = rx.recv().await.expect("should recv success"); Ok(res) @@ -124,7 +150,13 @@ pub fn run_task_loop_with_event( active_task_count -= 1; // merge async task result match r { - Ok(r) => queue.extend(r), + Ok(r) => { + let (async_task, sync_task): (Vec<_>, Vec<_>) = r + .into_iter() + .partition(|x| matches!(x.get_task_type(), TaskType::Async)); + sync_queue.extend(sync_task); + async_queue.extend(async_task); + } Err(e) => { is_expected_shutdown.store(true, Ordering::Relaxed); return Err(e);