From 66c1941b47fa45ca84aac394de38bd3f569ff729 Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Wed, 17 Jun 2026 17:06:12 -0600 Subject: [PATCH 1/2] add `Func::ready_to_call` and use it in `wasmtime_wasi_http::handler` This allows the caller to wait for the underlying component instance to be ready to call, i.e. for any outstanding sync calls to finish and for backpressure to be disabled, if applicable. This is particularly useful in `wasmtime_wasi_http::handler` where we need to decide whether a given worker is available to handle additional requests. Previously, that decision did not take backpressure into account, meaning the worker would continue to accept requests even when the guest had enabled backpressure. However, a worker whose guest has enabled backpressure should _not_ accept new requests; instead, it should mark itself unavailable and allow another worker to accept them. --- crates/wasi-http/src/handler.rs | 36 +++++++- .../src/runtime/component/concurrent.rs | 84 +++++++++++++++++++ .../src/runtime/component/concurrent/func.rs | 11 +++ 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/crates/wasi-http/src/handler.rs b/crates/wasi-http/src/handler.rs index 2dc49ab90766..62f7c9df5b0d 100644 --- a/crates/wasi-http/src/handler.rs +++ b/crates/wasi-http/src/handler.rs @@ -31,7 +31,7 @@ use std::sync::{ use std::task::{Context, Poll}; use std::time::Instant; use tokio::sync::Notify; -use wasmtime::component::{Accessor, GuestTaskId, Resource, TypedFuncCallConcurrent}; +use wasmtime::component::{Accessor, GuestTaskId, HasData, Resource, TypedFuncCallConcurrent}; #[cfg(feature = "p2")] use wasmtime::error::Context as _; use wasmtime::{AsContextMut, Result, Store, StoreContextMut, format_err}; @@ -793,6 +793,7 @@ where Some((pair, queue)) } )); + let mut ready = pin!(when_ready(accessor, proxy)); future::poll_fn(|cx| { loop { // First, and crucially first, poll `futures`. This way @@ -839,6 +840,13 @@ where Poll::Ready(None) | Poll::Pending => {} } + ready.set(when_ready(accessor, proxy)); + let is_ready = match ready.as_mut().poll(cx) { + Poll::Ready(Ok(())) => true, + Poll::Ready(Err(error)) => break Poll::Ready(Err(error)), + Poll::Pending => false, + }; + // At this point `futures` is either empty or it's `Pending` // meaning nothing is ready. Note that `Pending` here // doesn't necessarily mean all tasks are blocked on I/O. @@ -850,6 +858,7 @@ where // at all or all our tasks really are blocked on I/O. self.set_available( may_accept + && is_ready && match dropper .state .should_accept_request(futures.len(), reuse_count) @@ -905,7 +914,7 @@ where // if we're not actually capable of accepting any more work, // then we're completely done and it's time to exit this // worker. - if !may_accept { + if !(may_accept && is_ready) { break Poll::Ready(Ok(())); } @@ -1401,3 +1410,26 @@ impl<'a, T: Send> Prepared<'a, T> { } } } + +async fn when_ready(accessor: &Accessor, proxy: &Proxy) -> Result<()> { + match proxy { + #[cfg(feature = "p3")] + Proxy::P3(guest) => { + guest + .wasi_http_handler() + .func_handle() + .func() + .ready_to_call(accessor) + .await + } + #[cfg(feature = "p2")] + Proxy::P2(guest) => { + guest + .wasi_http_incoming_handler() + .func_handle() + .func() + .ready_to_call(accessor) + .await + } + } +} diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 5ec4de15bb15..d858f61ccf30 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -1843,6 +1843,9 @@ impl StoreOpaque { /// Iterate over `InstanceState::pending`, moving any ready items into the /// "high priority" work item queue. /// + /// Also, wake any wakers interested in the transition from not ready to + /// ready. + /// /// See `GuestCall::is_ready` for details. fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { for (thread, kind) in @@ -1860,6 +1863,20 @@ impl StoreOpaque { } } + for waker in self + .instance_state(instance) + .concurrent_state() + .wakers + .get_mut() + .iter_mut() + { + if let Some(waker) = waker.downcast_ref::() { + waker.wake_by_ref(); + } else { + bail_bug!("`ConcurrentInstanceState::wakers` should contain only `Waker`s"); + } + } + Ok(()) } @@ -4989,6 +5006,7 @@ pub struct ConcurrentInstanceState { /// Pending calls for this instance which require `Self::backpressure` to be /// `true` and/or `Self::do_not_enter` to be false before they can proceed. pending: BTreeMap, + wakers: AlwaysMut, } impl ConcurrentInstanceState { @@ -5702,3 +5720,69 @@ fn queue_call0( ) } } + +pub(crate) struct ReadyToCall<'a, T: 'static, D: HasData + ?Sized> { + accessor: &'a Accessor, + func: Func, + waker_key: Option>, +} + +impl Future for ReadyToCall<'_, T, D> { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.accessor.with(|mut store| { + let store = store.as_context_mut(); + let (_, _, _, raw_options) = self.func.abi_info(store.0); + let instance = self.func.instance().runtime_instance(raw_options.instance); + let state = store.0.instance_state(instance).concurrent_state(); + if state.backpressure == 0 { + Poll::Ready(Ok(())) + } else { + let waker = cx.waker().clone(); + if let Some(key) = &self.waker_key { + *state.wakers.get_mut().get_mut(key).unwrap() = waker; + } else { + self.waker_key = Some( + state + .wakers + .get_mut() + .push(waker) + .map_err(crate::Error::from)?, + ); + } + Poll::Pending + } + }) + } +} + +impl Drop for ReadyToCall<'_, T, D> { + fn drop(&mut self) { + if let Some(key) = self.waker_key.take() { + self.accessor.with(|mut store| { + let store = store.as_context_mut(); + let (_, _, _, raw_options) = self.func.abi_info(store.0); + let instance = self.func.instance().runtime_instance(raw_options.instance); + _ = store + .0 + .instance_state(instance) + .concurrent_state() + .wakers + .get_mut() + .delete(key); + }); + } + } +} + +pub(crate) fn ready_to_call<'a, T, D: HasData + ?Sized>( + accessor: &'a Accessor, + func: Func, +) -> ReadyToCall<'a, T, D> { + ReadyToCall { + accessor, + func, + waker_key: None, + } +} diff --git a/crates/wasmtime/src/runtime/component/concurrent/func.rs b/crates/wasmtime/src/runtime/component/concurrent/func.rs index 371bb8e621e9..0184063fa2ca 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/func.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/func.rs @@ -163,6 +163,17 @@ impl Func { Ok(()) } + /// Returns a future which will resolve once the component instance + /// corresponding to this function is ready to accept a call without queuing + /// it (i.e. does not have backpressure enabled and does not have a sync + /// call in progress). + pub async fn ready_to_call( + self, + accessor: impl AsAccessor, + ) -> Result<()> { + concurrent::ready_to_call(accessor.as_accessor(), self).await + } + /// Calls `concurrent::prepare_call` with monomorphized functions for /// lowering the parameters and lifting the result. fn prepare_call_dynamic<'a, T: Send + 'static>( From 9c893b0a9042a59491823355b92a844fbd15725a Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Thu, 18 Jun 2026 12:56:20 -0600 Subject: [PATCH 2/2] add test for `Func::ready_for_concurrent_call` --- .../tests/scenario/backpressure.rs | 130 ++++++++++++++++-- .../misc/component-async-tests/wit/test.wit | 1 + .../src/bin/async_backpressure_callee.rs | 10 ++ .../src/bin/async_cancel_callee.rs | 10 ++ crates/wasi-http/src/handler.rs | 4 +- .../src/runtime/component/concurrent/func.rs | 8 +- 6 files changed, 149 insertions(+), 14 deletions(-) diff --git a/crates/misc/component-async-tests/tests/scenario/backpressure.rs b/crates/misc/component-async-tests/tests/scenario/backpressure.rs index 21f18cdf2723..92bd66b4a97b 100644 --- a/crates/misc/component-async-tests/tests/scenario/backpressure.rs +++ b/crates/misc/component-async-tests/tests/scenario/backpressure.rs @@ -1,13 +1,127 @@ -use wasmtime::Result; +use component_async_tests::Ctx; +use std::{ + env, future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use wasmtime::{ + Engine, Result, Store, + component::{Linker, ResourceTable}, +}; +use wasmtime_wasi::WasiCtxBuilder; -use super::util::test_run; +use super::util::{config, make_component, test_run}; -// No-op function; we only test this by composing it in `async_backpressure_caller` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_backpressure_callee() {} +mod callee { + wasmtime::component::bindgen!({ + path: "wit", + world: "backpressure-callee", + exports: { default: async | store }, + }); +} + +#[tokio::test] +pub async fn async_backpressure_callee() -> Result<()> { + let mut config = config(); + // As of this writing, miri/pulley/epochs is a problematic combination, so + // we don't test it. + if env::var_os("MIRI_TEST_CWASM_DIR").is_none() { + config.epoch_interruption(true); + } + + let engine = Engine::new(&config)?; + let component = make_component( + &engine, + &[test_programs_artifacts::ASYNC_BACKPRESSURE_CALLEE_COMPONENT], + ) + .await?; + let mut linker = Linker::new(&engine); + wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; + + let mut store = Store::new( + &engine, + Ctx { + wasi: WasiCtxBuilder::new().inherit_stdio().build(), + table: ResourceTable::default(), + continue_: false, + }, + ); + + if env::var_os("MIRI_TEST_CWASM_DIR").is_none() { + store.set_epoch_deadline(1); + + std::thread::spawn(move || { + std::thread::sleep(Duration::from_secs(10)); + engine.increment_epoch(); + }); + } + + let guest = + callee::BackpressureCallee::instantiate_async(&mut store, &component, &linker).await?; + + store + .run_concurrent(async |accessor| { + guest + .local_local_backpressure() + .call_inc_then_later_dec_backpressure(accessor) + .await?; + + let mut instance = Some(Box::pin( + guest + .local_local_run() + .func_run() + .func() + .ready_for_concurrent_call(accessor), + )); + + let mut a = Some(Box::pin(guest.local_local_run().call_run(accessor))); + let mut b = Some(Box::pin(guest.local_local_run().call_run(accessor))); + let mut c = Some(Box::pin(guest.local_local_run().call_run(accessor))); + + let mut backpressure_is_set = true; + future::poll_fn(move |cx| { + let instance_ready = is_ready(cx, &mut instance); + let a_ready = is_ready(cx, &mut a); + let b_ready = is_ready(cx, &mut b); + let c_ready = is_ready(cx, &mut c); + + if backpressure_is_set { + assert!(!instance_ready); + assert!(!a_ready); + assert!(!b_ready); + assert!(!c_ready); + + backpressure_is_set = false; + + Poll::Pending + } else if instance_ready && a_ready && b_ready && c_ready { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + + wasmtime::error::Ok(()) + }) + .await??; + + Ok(()) +} + +fn is_ready(cx: &mut Context, fut: &mut Option>>) -> bool { + if let Some(v) = fut.as_mut() { + if v.as_mut().poll(cx).is_ready() { + *fut = None; + true + } else { + false + } + } else { + true + } +} #[tokio::test] pub async fn async_backpressure_caller() -> Result<()> { diff --git a/crates/misc/component-async-tests/wit/test.wit b/crates/misc/component-async-tests/wit/test.wit index 84efb8ec40a0..59eae0d05deb 100644 --- a/crates/misc/component-async-tests/wit/test.wit +++ b/crates/misc/component-async-tests/wit/test.wit @@ -63,6 +63,7 @@ interface backpressure { set-backpressure: func(enabled: bool); inc-backpressure: func(); dec-backpressure: func(); + inc-then-later-dec-backpressure: async func(); } interface transmit { diff --git a/crates/test-programs/src/bin/async_backpressure_callee.rs b/crates/test-programs/src/bin/async_backpressure_callee.rs index 7f89a67d77fa..ec1fef7f0529 100644 --- a/crates/test-programs/src/bin/async_backpressure_callee.rs +++ b/crates/test-programs/src/bin/async_backpressure_callee.rs @@ -32,6 +32,16 @@ impl Backpressure for Component { fn dec_backpressure() { wit_bindgen::backpressure_dec(); } + async fn inc_then_later_dec_backpressure() { + wit_bindgen::backpressure_inc(); + + wit_bindgen::spawn_local(async { + for _ in 0..10 { + wit_bindgen::yield_async().await; + } + wit_bindgen::backpressure_dec(); + }); + } } // Unused function; required since this file is built as a `bin`: diff --git a/crates/test-programs/src/bin/async_cancel_callee.rs b/crates/test-programs/src/bin/async_cancel_callee.rs index 2ee686dec974..b92410843cf5 100644 --- a/crates/test-programs/src/bin/async_cancel_callee.rs +++ b/crates/test-programs/src/bin/async_cancel_callee.rs @@ -97,6 +97,16 @@ unsafe extern "C" fn export_dec_backpressure() { wit_bindgen::backpressure_dec(); } +#[unsafe(export_name = "[async-lift]local:local/backpressure#inc-then-later-dec-backpressure")] +unsafe extern "C" fn export_inc_then_later_dec_backpressure() -> u32 { + todo!() +} + +#[unsafe(export_name = "[callback][async-lift]local:local/backpressure#inc-then-later-dec-backpressure")] +unsafe extern "C" fn callback_inc_then_later_dec_backpressure(_: u32, _: u32, _: u32) -> u32 { + todo!() +} + #[unsafe(export_name = "local:local/yield#yield-times")] unsafe extern "C" fn export_yield_yield_times(times: u64) { unsafe { diff --git a/crates/wasi-http/src/handler.rs b/crates/wasi-http/src/handler.rs index 62f7c9df5b0d..69f8fa7bf61f 100644 --- a/crates/wasi-http/src/handler.rs +++ b/crates/wasi-http/src/handler.rs @@ -1419,7 +1419,7 @@ async fn when_ready(accessor: &Accessor, proxy: &Proxy) -> .wasi_http_handler() .func_handle() .func() - .ready_to_call(accessor) + .ready_for_concurrent_call(accessor) .await } #[cfg(feature = "p2")] @@ -1428,7 +1428,7 @@ async fn when_ready(accessor: &Accessor, proxy: &Proxy) -> .wasi_http_incoming_handler() .func_handle() .func() - .ready_to_call(accessor) + .ready_for_concurrent_call(accessor) .await } } diff --git a/crates/wasmtime/src/runtime/component/concurrent/func.rs b/crates/wasmtime/src/runtime/component/concurrent/func.rs index 0184063fa2ca..365c0c7a5e93 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/func.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/func.rs @@ -164,10 +164,10 @@ impl Func { } /// Returns a future which will resolve once the component instance - /// corresponding to this function is ready to accept a call without queuing - /// it (i.e. does not have backpressure enabled and does not have a sync - /// call in progress). - pub async fn ready_to_call( + /// corresponding to this function is ready to run a concurrent call without + /// queuing it (i.e. does not have backpressure enabled and does not have a + /// sync call in progress). + pub async fn ready_for_concurrent_call( self, accessor: impl AsAccessor, ) -> Result<()> {