Skip to content

Commit

Permalink
Propagate action cancellation for Funrun (#24942)
Browse files Browse the repository at this point in the history
This would stop the actions if backend goes away but otherwise result in the same behavior as today.

GitOrigin-RevId: 9ab50448ef5afde7815e2cdc9155dad46aedd117
  • Loading branch information
Preslav Le authored and Convex, Inc. committed May 1, 2024
1 parent 572040b commit 6282906
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
10 changes: 8 additions & 2 deletions crates/function_runner/src/isolate_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> {
RequestType::Action {
request,
environment_data,
response,
mut response,
queue_timer,
action_callbacks,
fetch_client,
Expand All @@ -143,7 +143,13 @@ impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> {
request.context,
);
let r = environment
.run_action(client_id, isolate, isolate_clean, request.params.clone())
.run_action(
client_id,
isolate,
isolate_clean,
request.params.clone(),
response.cancellation().boxed(),
)
.await;

let status = match &r {
Expand Down
13 changes: 10 additions & 3 deletions crates/isolate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ impl<RT: Runtime> IsolateWorker<RT> for BackendIsolateWorker<RT> {
RequestType::HttpAction {
request,
environment_data,
response,
mut response,
queue_timer,
action_callbacks,
fetch_client,
Expand Down Expand Up @@ -1578,6 +1578,7 @@ impl<RT: Runtime> IsolateWorker<RT> for BackendIsolateWorker<RT> {
isolate_clean,
request.router_path,
request.http_request,
response.cancellation().boxed(),
)
.await;
let status = match &r {
Expand All @@ -1597,7 +1598,7 @@ impl<RT: Runtime> IsolateWorker<RT> for BackendIsolateWorker<RT> {
RequestType::Action {
request,
environment_data,
response,
mut response,
queue_timer,
action_callbacks,
fetch_client,
Expand All @@ -1617,7 +1618,13 @@ impl<RT: Runtime> IsolateWorker<RT> for BackendIsolateWorker<RT> {
request.context,
);
let r = environment
.run_action(client_id, isolate, isolate_clean, request.params.clone())
.run_action(
client_id,
isolate,
isolate_clean,
request.params.clone(),
response.cancellation().boxed(),
)
.await;
let status = match &r {
Ok(outcome) => {
Expand Down
24 changes: 22 additions & 2 deletions crates/isolate/src/environment/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use database::Transaction;
use deno_core::v8;
use futures::{
channel::mpsc,
future::BoxFuture,
select_biased,
stream::BoxStream,
Future,
Expand Down Expand Up @@ -154,6 +155,7 @@ use crate::{
},
metrics::{
self,
log_isolate_request_cancelled,
log_unawaited_pending_op,
},
ops::OpProvider,
Expand Down Expand Up @@ -257,6 +259,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
isolate_clean: &mut bool,
validated_path: ValidatedHttpPath,
request: HttpActionRequest,
cancellation: BoxFuture<'_, ()>,
) -> anyhow::Result<HttpActionOutcome> {
let client_id = Arc::new(client_id);
let start_unix_timestamp = self.rt.unix_timestamp();
Expand All @@ -278,6 +281,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
&mut isolate_context,
validated_path.canonicalized_udf_path(),
request,
cancellation,
)
.await;
// Override the returned result if we hit a termination error.
Expand Down Expand Up @@ -325,6 +329,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
isolate: &mut RequestScope<'_, '_, RT, Self>,
router_path: &CanonicalizedUdfPath,
http_request: HttpActionRequest,
cancellation: BoxFuture<'_, ()>,
) -> anyhow::Result<(HttpActionRoute, Result<HttpActionResponse, JsError>)> {
let handle = isolate.handle();
let mut v8_scope = isolate.scope();
Expand Down Expand Up @@ -415,6 +420,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
v8_function,
&v8_args,
Self::collect_http_result,
cancellation,
)
.await?;
Ok((route, result))
Expand Down Expand Up @@ -478,6 +484,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
isolate: &mut Isolate<RT>,
isolate_clean: &mut bool,
request_params: ActionRequestParams,
cancellation: BoxFuture<'_, ()>,
) -> anyhow::Result<ActionOutcome> {
let client_id = Arc::new(client_id);
let start_unix_timestamp = self.rt.unix_timestamp();
Expand All @@ -493,8 +500,13 @@ impl<RT: Runtime> ActionEnvironment<RT> {
let mut isolate_context =
RequestScope::new(&mut context_scope, handle.clone(), state, true).await?;

let mut result =
Self::run_action_inner(client_id, &mut isolate_context, request_params.clone()).await;
let mut result = Self::run_action_inner(
client_id,
&mut isolate_context,
request_params.clone(),
cancellation,
)
.await;

// Perform a microtask checkpoint one last time before taking the environment
// to ensure the microtask queue is empty. Otherwise, JS from this request may
Expand Down Expand Up @@ -539,6 +551,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
client_id: Arc<String>,
isolate: &mut RequestScope<'_, '_, RT, Self>,
request_params: ActionRequestParams,
cancellation: BoxFuture<'_, ()>,
) -> anyhow::Result<Result<ConvexValue, JsError>> {
let handle = isolate.handle();
let mut v8_scope = isolate.scope();
Expand Down Expand Up @@ -625,6 +638,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
let result = deserialize_udf_result(&udf_path, &result_str)?;
Ok(async move { Ok(result) })
},
cancellation,
)
.await
}
Expand Down Expand Up @@ -752,6 +766,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
&mut ExecutionScope<'a, 'b, RT, Self>,
String,
) -> anyhow::Result<Fut>,
cancellation: BoxFuture<'_, ()>,
) -> anyhow::Result<Result<T, JsError>>
where
Fut: Future<Output = anyhow::Result<Result<T, JsError>>> + Send + 'static,
Expand Down Expand Up @@ -784,6 +799,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
// collecting a result. Using None would be nice, but `select_biased!`
// does not like Options.
let mut collecting_result = (async { std::future::pending().await }).boxed().fuse();
let mut cancellation = cancellation.fuse();
let result = loop {
// Advance the user's promise as far as it can go by draining the microtask
// queue.
Expand Down Expand Up @@ -914,6 +930,10 @@ impl<RT: Runtime> ActionEnvironment<RT> {
_ = timeout.fuse() => {
continue;
},
_ = cancellation => {
log_isolate_request_cancelled();
anyhow::bail!("Cancelled");
},
}
let permit_acquire = scope.with_state_mut(|state| {
state
Expand Down

0 comments on commit 6282906

Please sign in to comment.