From 62829064daa493431cc27c3dbdb0ae99722a1f7f Mon Sep 17 00:00:00 2001 From: Preslav Le Date: Tue, 30 Apr 2024 16:57:57 -0700 Subject: [PATCH] Propagate action cancellation for Funrun (#24942) This would stop the actions if backend goes away but otherwise result in the same behavior as today. GitOrigin-RevId: 9ab50448ef5afde7815e2cdc9155dad46aedd117 --- crates/function_runner/src/isolate_worker.rs | 10 ++++++-- crates/isolate/src/client.rs | 13 ++++++++--- crates/isolate/src/environment/action/mod.rs | 24 ++++++++++++++++++-- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/crates/function_runner/src/isolate_worker.rs b/crates/function_runner/src/isolate_worker.rs index 53ca72a8..42a752a8 100644 --- a/crates/function_runner/src/isolate_worker.rs +++ b/crates/function_runner/src/isolate_worker.rs @@ -122,7 +122,7 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { RequestType::Action { request, environment_data, - response, + mut response, queue_timer, action_callbacks, fetch_client, @@ -143,7 +143,13 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { request.context, ); let r = environment - .run_action(client_id, isolate, isolate_clean, request.params.clone()) + .run_action( + client_id, + isolate, + isolate_clean, + request.params.clone(), + response.cancellation().boxed(), + ) .await; let status = match &r { diff --git a/crates/isolate/src/client.rs b/crates/isolate/src/client.rs index f5721798..08c6a996 100644 --- a/crates/isolate/src/client.rs +++ b/crates/isolate/src/client.rs @@ -1550,7 +1550,7 @@ impl IsolateWorker for BackendIsolateWorker { RequestType::HttpAction { request, environment_data, - response, + mut response, queue_timer, action_callbacks, fetch_client, @@ -1578,6 +1578,7 @@ impl IsolateWorker for BackendIsolateWorker { isolate_clean, request.router_path, request.http_request, + response.cancellation().boxed(), ) .await; let status = match &r { @@ -1597,7 +1598,7 @@ impl IsolateWorker for BackendIsolateWorker { RequestType::Action { request, environment_data, - response, + mut response, queue_timer, action_callbacks, fetch_client, @@ -1617,7 +1618,13 @@ impl IsolateWorker for BackendIsolateWorker { request.context, ); let r = environment - .run_action(client_id, isolate, isolate_clean, request.params.clone()) + .run_action( + client_id, + isolate, + isolate_clean, + request.params.clone(), + response.cancellation().boxed(), + ) .await; let status = match &r { Ok(outcome) => { diff --git a/crates/isolate/src/environment/action/mod.rs b/crates/isolate/src/environment/action/mod.rs index 2ef7cf78..7d82ca40 100644 --- a/crates/isolate/src/environment/action/mod.rs +++ b/crates/isolate/src/environment/action/mod.rs @@ -50,6 +50,7 @@ use database::Transaction; use deno_core::v8; use futures::{ channel::mpsc, + future::BoxFuture, select_biased, stream::BoxStream, Future, @@ -154,6 +155,7 @@ use crate::{ }, metrics::{ self, + log_isolate_request_cancelled, log_unawaited_pending_op, }, ops::OpProvider, @@ -257,6 +259,7 @@ impl ActionEnvironment { isolate_clean: &mut bool, validated_path: ValidatedHttpPath, request: HttpActionRequest, + cancellation: BoxFuture<'_, ()>, ) -> anyhow::Result { let client_id = Arc::new(client_id); let start_unix_timestamp = self.rt.unix_timestamp(); @@ -278,6 +281,7 @@ impl ActionEnvironment { &mut isolate_context, validated_path.canonicalized_udf_path(), request, + cancellation, ) .await; // Override the returned result if we hit a termination error. @@ -325,6 +329,7 @@ impl ActionEnvironment { isolate: &mut RequestScope<'_, '_, RT, Self>, router_path: &CanonicalizedUdfPath, http_request: HttpActionRequest, + cancellation: BoxFuture<'_, ()>, ) -> anyhow::Result<(HttpActionRoute, Result)> { let handle = isolate.handle(); let mut v8_scope = isolate.scope(); @@ -415,6 +420,7 @@ impl ActionEnvironment { v8_function, &v8_args, Self::collect_http_result, + cancellation, ) .await?; Ok((route, result)) @@ -478,6 +484,7 @@ impl ActionEnvironment { isolate: &mut Isolate, isolate_clean: &mut bool, request_params: ActionRequestParams, + cancellation: BoxFuture<'_, ()>, ) -> anyhow::Result { let client_id = Arc::new(client_id); let start_unix_timestamp = self.rt.unix_timestamp(); @@ -493,8 +500,13 @@ impl ActionEnvironment { let mut isolate_context = RequestScope::new(&mut context_scope, handle.clone(), state, true).await?; - let mut result = - Self::run_action_inner(client_id, &mut isolate_context, request_params.clone()).await; + let mut result = Self::run_action_inner( + client_id, + &mut isolate_context, + request_params.clone(), + cancellation, + ) + .await; // Perform a microtask checkpoint one last time before taking the environment // to ensure the microtask queue is empty. Otherwise, JS from this request may @@ -539,6 +551,7 @@ impl ActionEnvironment { client_id: Arc, isolate: &mut RequestScope<'_, '_, RT, Self>, request_params: ActionRequestParams, + cancellation: BoxFuture<'_, ()>, ) -> anyhow::Result> { let handle = isolate.handle(); let mut v8_scope = isolate.scope(); @@ -625,6 +638,7 @@ impl ActionEnvironment { let result = deserialize_udf_result(&udf_path, &result_str)?; Ok(async move { Ok(result) }) }, + cancellation, ) .await } @@ -752,6 +766,7 @@ impl ActionEnvironment { &mut ExecutionScope<'a, 'b, RT, Self>, String, ) -> anyhow::Result, + cancellation: BoxFuture<'_, ()>, ) -> anyhow::Result> where Fut: Future>> + Send + 'static, @@ -784,6 +799,7 @@ impl ActionEnvironment { // collecting a result. Using None would be nice, but `select_biased!` // does not like Options. let mut collecting_result = (async { std::future::pending().await }).boxed().fuse(); + let mut cancellation = cancellation.fuse(); let result = loop { // Advance the user's promise as far as it can go by draining the microtask // queue. @@ -914,6 +930,10 @@ impl ActionEnvironment { _ = timeout.fuse() => { continue; }, + _ = cancellation => { + log_isolate_request_cancelled(); + anyhow::bail!("Cancelled"); + }, } let permit_acquire = scope.with_state_mut(|state| { state