Skip to content

Commit

Permalink
[Http actions -> FunRun] Add new RPC to support running Http actions …
Browse files Browse the repository at this point in the history
…(#29944)

GitOrigin-RevId: 57c6c4c7749511a1132c0f7c3c4813de6e418361
  • Loading branch information
jordanhunt22 authored and Convex, Inc. committed Sep 21, 2024
1 parent b888c98 commit 372b67b
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 13 additions & 39 deletions crates/function_runner/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub struct RunRequestArgs {
pub context: ExecutionContext,
}

#[derive(Clone)]
pub struct FunctionMetadata {
pub path_and_args: ValidatedPathAndArgs,
pub journal: QueryJournal,
Expand Down Expand Up @@ -376,37 +377,19 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
pub async fn run_function_no_retention_check(
&self,
run_request_args: RunRequestArgs,
function_metadata: FunctionMetadata,
function_metadata: Option<FunctionMetadata>,
http_action_metadata: Option<HttpActionMetadata>,
) -> anyhow::Result<(
Option<FunctionFinalTransaction>,
FunctionOutcome,
FunctionUsageStats,
)> {
if run_request_args.udf_type == UdfType::HttpAction {
anyhow::bail!("You can't run http actions from this method");
}
self.run_function_no_retention_check_inner(run_request_args, Some(function_metadata), None)
.await
}

#[minitrace::trace]
pub async fn run_http_action_no_retention_check(
&self,
run_request_args: RunRequestArgs,
http_action_metadata: HttpActionMetadata,
) -> anyhow::Result<(FunctionOutcome, FunctionUsageStats)> {
if run_request_args.udf_type != UdfType::HttpAction {
anyhow::bail!("You can only run http actions with this method");
}
let (_, outcome, stats) = self
.run_function_no_retention_check_inner(
run_request_args,
None,
Some(http_action_metadata),
)
.await?;

Ok((outcome, stats))
self.run_function_no_retention_check_inner(
run_request_args,
function_metadata,
http_action_metadata,
)
.await
}

#[minitrace::trace]
Expand Down Expand Up @@ -739,22 +722,13 @@ impl<RT: Runtime> FunctionRunner<RT> for InProcessFunctionRunner<RT> {
let result = match udf_type {
UdfType::Query | UdfType::Mutation | UdfType::Action => {
self.server
.run_function_no_retention_check(
request_metadata,
function_metadata
.context("Missing function metadata for query, mutation or action")?,
)
.run_function_no_retention_check(request_metadata, function_metadata, None)
.await
},
UdfType::HttpAction => {
let (outcome, stats) = self
.server
.run_http_action_no_retention_check(
request_metadata,
http_action_metadata.context("Missing http action metadata")?,
)
.await?;
Ok((None, outcome, stats))
self.server
.run_function_no_retention_check(request_metadata, None, http_action_metadata)
.await
},
};
validate_run_function_result(udf_type, *ts, self.database.retention_validator()).await?;
Expand Down
4 changes: 4 additions & 0 deletions crates/isolate/src/environment/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ use crate::{
};

#[derive(Debug, Clone)]
#[cfg_attr(
any(test, feature = "testing"),
derive(proptest_derive::Arbitrary, PartialEq)
)]
pub enum HttpActionResult {
Streamed,
Error(JsError),
Expand Down
129 changes: 128 additions & 1 deletion crates/isolate/src/environment/action/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ use pb::{
function_result::Result as FunctionResultTypeProto,
FunctionResult as FunctionResultProto,
},
outcome::ActionOutcome as ActionOutcomeProto,
outcome::{
ActionOutcome as ActionOutcomeProto,
HttpActionOutcome as HttpActionOutcomeProto,
},
};
#[cfg(any(test, feature = "testing"))]
use proptest::prelude::*;
use semver::Version;
use serde_json::Value as JsonValue;
use value::ConvexValue;

use super::HttpActionResult;
#[cfg(any(test, feature = "testing"))]
use crate::HttpActionRequest;
use crate::{
environment::helpers::{
JsonPackedValue,
Expand Down Expand Up @@ -164,6 +170,7 @@ impl Arbitrary for ActionOutcome {
}

#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))]
pub struct HttpActionOutcome {
pub route: HttpActionRoute,
pub http_request: HttpActionRequestHead,
Expand Down Expand Up @@ -207,6 +214,108 @@ impl HttpActionOutcome {
pub fn memory_in_mb(&self) -> u64 {
self.memory_in_mb
}

pub(crate) fn from_proto(
HttpActionOutcomeProto {
unix_timestamp,
result,
syscall_trace,
memory_in_mb,
}: HttpActionOutcomeProto,
http_request: HttpActionRequestHead,
udf_server_version: Option<Version>,
identity: InertIdentity,
) -> anyhow::Result<Self> {
let result = result.ok_or_else(|| anyhow::anyhow!("Missing result"))?;
let result = match result.result {
Some(FunctionResultTypeProto::JsonPackedValue(_)) => {
anyhow::bail!("Http actions not expected to have aresult")
},
Some(FunctionResultTypeProto::JsError(js_error)) => {
HttpActionResult::Error(js_error.try_into()?)
},
None => HttpActionResult::Streamed,
};
Ok(Self {
identity,
unix_timestamp: unix_timestamp
.context("Missing unix_timestamp")?
.try_into()?,
result,
syscall_trace: syscall_trace.context("Missing syscall_trace")?.try_into()?,
memory_in_mb,
http_request: http_request.clone(),
udf_server_version,
route: HttpActionRoute {
method: http_request.method.try_into()?,
path: http_request.url.to_string(),
},
})
}
}

impl TryFrom<HttpActionOutcome> for HttpActionOutcomeProto {
type Error = anyhow::Error;

fn try_from(
HttpActionOutcome {
route: _,
http_request: _,
identity: _,
unix_timestamp,
result,
syscall_trace,
udf_server_version: _,
memory_in_mb,
}: HttpActionOutcome,
) -> anyhow::Result<Self> {
let result = match result {
HttpActionResult::Streamed => None,
HttpActionResult::Error(js_error) => {
Some(FunctionResultTypeProto::JsError(js_error.try_into()?))
},
};
Ok(Self {
unix_timestamp: Some(unix_timestamp.into()),
result: Some(FunctionResultProto { result }),
syscall_trace: Some(syscall_trace.try_into()?),
memory_in_mb,
})
}
}

#[cfg(any(test, feature = "testing"))]
impl Arbitrary for HttpActionOutcome {
type Parameters = ();

type Strategy = impl Strategy<Value = HttpActionOutcome>;

fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
(
any::<HttpActionRequest>(),
any::<HttpActionResult>(),
any::<InertIdentity>(),
any::<UnixTimestamp>(),
any::<SyscallTrace>(),
any::<u64>(),
)
.prop_map(
|(request, result, identity, unix_timestamp, syscall_trace, memory_in_mb)| Self {
http_request: request.head.clone(),
result,
route: HttpActionRoute {
method: request.head.method.try_into().unwrap(),
path: request.head.url.to_string(),
},
identity,
unix_timestamp,
syscall_trace,
memory_in_mb,
// Ok to not generate semver::Version because it is not serialized anyway
udf_server_version: None,
},
)
}
}

#[cfg(test)]
Expand All @@ -216,8 +325,10 @@ mod tests {
use super::{
ActionOutcome,
ActionOutcomeProto,
HttpActionOutcomeProto,
ValidatedPathAndArgs,
};
use crate::HttpActionOutcome;

proptest! {
#![proptest_config(
Expand All @@ -244,5 +355,21 @@ mod tests {
).unwrap();
assert_eq!(udf_outcome, udf_outcome_from_proto);
}

#[test]
fn test_http_action_outcome_roundtrips(udf_outcome in any::<HttpActionOutcome>()) {
let udf_outcome_clone = udf_outcome.clone();
let http_request = udf_outcome.http_request.clone();
let version = udf_outcome.udf_server_version.clone();
let identity = udf_outcome_clone.identity.clone();
let proto = HttpActionOutcomeProto::try_from(udf_outcome_clone).unwrap();
let udf_outcome_from_proto = HttpActionOutcome::from_proto(
proto,
http_request,
version,
identity,
).unwrap();
assert_eq!(udf_outcome, udf_outcome_from_proto);
}
}
}
41 changes: 30 additions & 11 deletions crates/isolate/src/environment/helpers/outcome.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use common::identity::InertIdentity;
use pb::outcome::{
function_outcome::Outcome as OutcomeProto,
Expand All @@ -12,6 +13,8 @@ use crate::{
},
udf::outcome::UdfOutcome,
},
HttpActionRequestHead,
ValidatedHttpPath,
ValidatedPathAndArgs,
};

Expand All @@ -32,22 +35,40 @@ pub enum FunctionOutcome {
impl FunctionOutcome {
pub fn from_proto(
FunctionOutcomeProto { outcome }: FunctionOutcomeProto,
path_and_args: ValidatedPathAndArgs,
path_and_args: Option<ValidatedPathAndArgs>,
http_metadata: Option<(ValidatedHttpPath, HttpActionRequestHead)>,
identity: InertIdentity,
) -> anyhow::Result<Self> {
let outcome = outcome.ok_or_else(|| anyhow::anyhow!("Missing outcome"))?;
match outcome {
OutcomeProto::Query(outcome) => Ok(FunctionOutcome::Query(UdfOutcome::from_proto(
outcome,
path_and_args,
path_and_args.context("Missing path and args")?,
identity,
)?)),
OutcomeProto::Mutation(outcome) => Ok(FunctionOutcome::Mutation(
UdfOutcome::from_proto(outcome, path_and_args, identity)?,
)),
OutcomeProto::Action(outcome) => Ok(FunctionOutcome::Action(
ActionOutcome::from_proto(outcome, path_and_args, identity)?,
)),
OutcomeProto::Mutation(outcome) => {
Ok(FunctionOutcome::Mutation(UdfOutcome::from_proto(
outcome,
path_and_args.context("Missing path and args")?,
identity,
)?))
},
OutcomeProto::Action(outcome) => {
Ok(FunctionOutcome::Action(ActionOutcome::from_proto(
outcome,
path_and_args.context("Missing path and args")?,
identity,
)?))
},
OutcomeProto::HttpAction(outcome) => {
let (http_path, http_request) = http_metadata.context("Missing http metadata")?;
Ok(FunctionOutcome::HttpAction(HttpActionOutcome::from_proto(
outcome,
http_request,
http_path.npm_version().clone(),
identity,
)?))
},
}
}
}
Expand All @@ -60,9 +81,7 @@ impl TryFrom<FunctionOutcome> for FunctionOutcomeProto {
FunctionOutcome::Query(outcome) => OutcomeProto::Query(outcome.try_into()?),
FunctionOutcome::Mutation(outcome) => OutcomeProto::Mutation(outcome.try_into()?),
FunctionOutcome::Action(outcome) => OutcomeProto::Action(outcome.try_into()?),
FunctionOutcome::HttpAction(_) => {
anyhow::bail!("Funrun does not support http actions")
},
FunctionOutcome::HttpAction(outcome) => OutcomeProto::HttpAction(outcome.try_into()?),
};
Ok(FunctionOutcomeProto {
outcome: Some(outcome),
Expand Down
Loading

0 comments on commit 372b67b

Please sign in to comment.