diff --git a/Cargo.lock b/Cargo.lock index 736faaec..f31acb41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5032,6 +5032,7 @@ dependencies = [ "anyhow", "convex_sync_types", "errors", + "http 1.1.0", "pb_build", "proptest", "prost", diff --git a/crates/function_runner/src/server.rs b/crates/function_runner/src/server.rs index df234d26..a65a39b5 100644 --- a/crates/function_runner/src/server.rs +++ b/crates/function_runner/src/server.rs @@ -148,6 +148,7 @@ pub struct RunRequestArgs { pub context: ExecutionContext, } +#[derive(Clone)] pub struct FunctionMetadata { pub path_and_args: ValidatedPathAndArgs, pub journal: QueryJournal, @@ -376,37 +377,19 @@ impl> FunctionRunnerCore { pub async fn run_function_no_retention_check( &self, run_request_args: RunRequestArgs, - function_metadata: FunctionMetadata, + function_metadata: Option, + http_action_metadata: Option, ) -> anyhow::Result<( Option, FunctionOutcome, FunctionUsageStats, )> { - if run_request_args.udf_type == UdfType::HttpAction { - anyhow::bail!("You can't run http actions from this method"); - } - self.run_function_no_retention_check_inner(run_request_args, Some(function_metadata), None) - .await - } - - #[minitrace::trace] - pub async fn run_http_action_no_retention_check( - &self, - run_request_args: RunRequestArgs, - http_action_metadata: HttpActionMetadata, - ) -> anyhow::Result<(FunctionOutcome, FunctionUsageStats)> { - if run_request_args.udf_type != UdfType::HttpAction { - anyhow::bail!("You can only run http actions with this method"); - } - let (_, outcome, stats) = self - .run_function_no_retention_check_inner( - run_request_args, - None, - Some(http_action_metadata), - ) - .await?; - - Ok((outcome, stats)) + self.run_function_no_retention_check_inner( + run_request_args, + function_metadata, + http_action_metadata, + ) + .await } #[minitrace::trace] @@ -739,22 +722,13 @@ impl FunctionRunner for InProcessFunctionRunner { let result = match udf_type { UdfType::Query | UdfType::Mutation | UdfType::Action => { self.server - .run_function_no_retention_check( - request_metadata, - function_metadata - .context("Missing function metadata for query, mutation or action")?, - ) + .run_function_no_retention_check(request_metadata, function_metadata, None) .await }, UdfType::HttpAction => { - let (outcome, stats) = self - .server - .run_http_action_no_retention_check( - request_metadata, - http_action_metadata.context("Missing http action metadata")?, - ) - .await?; - Ok((None, outcome, stats)) + self.server + .run_function_no_retention_check(request_metadata, None, http_action_metadata) + .await }, }; validate_run_function_result(udf_type, *ts, self.database.retention_validator()).await?; diff --git a/crates/isolate/src/environment/action/mod.rs b/crates/isolate/src/environment/action/mod.rs index 56511499..dbfb2ea1 100644 --- a/crates/isolate/src/environment/action/mod.rs +++ b/crates/isolate/src/environment/action/mod.rs @@ -182,6 +182,10 @@ use crate::{ }; #[derive(Debug, Clone)] +#[cfg_attr( + any(test, feature = "testing"), + derive(proptest_derive::Arbitrary, PartialEq) +)] pub enum HttpActionResult { Streamed, Error(JsError), diff --git a/crates/isolate/src/environment/action/outcome.rs b/crates/isolate/src/environment/action/outcome.rs index 36fd8685..647dbb2f 100644 --- a/crates/isolate/src/environment/action/outcome.rs +++ b/crates/isolate/src/environment/action/outcome.rs @@ -16,14 +16,20 @@ use pb::{ function_result::Result as FunctionResultTypeProto, FunctionResult as FunctionResultProto, }, - outcome::ActionOutcome as ActionOutcomeProto, + outcome::{ + ActionOutcome as ActionOutcomeProto, + HttpActionOutcome as HttpActionOutcomeProto, + }, }; #[cfg(any(test, feature = "testing"))] use proptest::prelude::*; +use semver::Version; use serde_json::Value as JsonValue; use value::ConvexValue; use super::HttpActionResult; +#[cfg(any(test, feature = "testing"))] +use crate::HttpActionRequest; use crate::{ environment::helpers::{ JsonPackedValue, @@ -164,6 +170,7 @@ impl Arbitrary for ActionOutcome { } #[derive(Debug, Clone)] +#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))] pub struct HttpActionOutcome { pub route: HttpActionRoute, pub http_request: HttpActionRequestHead, @@ -207,6 +214,108 @@ impl HttpActionOutcome { pub fn memory_in_mb(&self) -> u64 { self.memory_in_mb } + + pub(crate) fn from_proto( + HttpActionOutcomeProto { + unix_timestamp, + result, + syscall_trace, + memory_in_mb, + }: HttpActionOutcomeProto, + http_request: HttpActionRequestHead, + udf_server_version: Option, + identity: InertIdentity, + ) -> anyhow::Result { + let result = result.ok_or_else(|| anyhow::anyhow!("Missing result"))?; + let result = match result.result { + Some(FunctionResultTypeProto::JsonPackedValue(_)) => { + anyhow::bail!("Http actions not expected to have aresult") + }, + Some(FunctionResultTypeProto::JsError(js_error)) => { + HttpActionResult::Error(js_error.try_into()?) + }, + None => HttpActionResult::Streamed, + }; + Ok(Self { + identity, + unix_timestamp: unix_timestamp + .context("Missing unix_timestamp")? + .try_into()?, + result, + syscall_trace: syscall_trace.context("Missing syscall_trace")?.try_into()?, + memory_in_mb, + http_request: http_request.clone(), + udf_server_version, + route: HttpActionRoute { + method: http_request.method.try_into()?, + path: http_request.url.to_string(), + }, + }) + } +} + +impl TryFrom for HttpActionOutcomeProto { + type Error = anyhow::Error; + + fn try_from( + HttpActionOutcome { + route: _, + http_request: _, + identity: _, + unix_timestamp, + result, + syscall_trace, + udf_server_version: _, + memory_in_mb, + }: HttpActionOutcome, + ) -> anyhow::Result { + let result = match result { + HttpActionResult::Streamed => None, + HttpActionResult::Error(js_error) => { + Some(FunctionResultTypeProto::JsError(js_error.try_into()?)) + }, + }; + Ok(Self { + unix_timestamp: Some(unix_timestamp.into()), + result: Some(FunctionResultProto { result }), + syscall_trace: Some(syscall_trace.try_into()?), + memory_in_mb, + }) + } +} + +#[cfg(any(test, feature = "testing"))] +impl Arbitrary for HttpActionOutcome { + type Parameters = (); + + type Strategy = impl Strategy; + + fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { + ( + any::(), + any::(), + any::(), + any::(), + any::(), + any::(), + ) + .prop_map( + |(request, result, identity, unix_timestamp, syscall_trace, memory_in_mb)| Self { + http_request: request.head.clone(), + result, + route: HttpActionRoute { + method: request.head.method.try_into().unwrap(), + path: request.head.url.to_string(), + }, + identity, + unix_timestamp, + syscall_trace, + memory_in_mb, + // Ok to not generate semver::Version because it is not serialized anyway + udf_server_version: None, + }, + ) + } } #[cfg(test)] @@ -216,8 +325,10 @@ mod tests { use super::{ ActionOutcome, ActionOutcomeProto, + HttpActionOutcomeProto, ValidatedPathAndArgs, }; + use crate::HttpActionOutcome; proptest! { #![proptest_config( @@ -244,5 +355,21 @@ mod tests { ).unwrap(); assert_eq!(udf_outcome, udf_outcome_from_proto); } + + #[test] + fn test_http_action_outcome_roundtrips(udf_outcome in any::()) { + let udf_outcome_clone = udf_outcome.clone(); + let http_request = udf_outcome.http_request.clone(); + let version = udf_outcome.udf_server_version.clone(); + let identity = udf_outcome_clone.identity.clone(); + let proto = HttpActionOutcomeProto::try_from(udf_outcome_clone).unwrap(); + let udf_outcome_from_proto = HttpActionOutcome::from_proto( + proto, + http_request, + version, + identity, + ).unwrap(); + assert_eq!(udf_outcome, udf_outcome_from_proto); + } } } diff --git a/crates/isolate/src/environment/helpers/outcome.rs b/crates/isolate/src/environment/helpers/outcome.rs index c0437670..51c18689 100644 --- a/crates/isolate/src/environment/helpers/outcome.rs +++ b/crates/isolate/src/environment/helpers/outcome.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use common::identity::InertIdentity; use pb::outcome::{ function_outcome::Outcome as OutcomeProto, @@ -12,6 +13,8 @@ use crate::{ }, udf::outcome::UdfOutcome, }, + HttpActionRequestHead, + ValidatedHttpPath, ValidatedPathAndArgs, }; @@ -32,22 +35,40 @@ pub enum FunctionOutcome { impl FunctionOutcome { pub fn from_proto( FunctionOutcomeProto { outcome }: FunctionOutcomeProto, - path_and_args: ValidatedPathAndArgs, + path_and_args: Option, + http_metadata: Option<(ValidatedHttpPath, HttpActionRequestHead)>, identity: InertIdentity, ) -> anyhow::Result { let outcome = outcome.ok_or_else(|| anyhow::anyhow!("Missing outcome"))?; match outcome { OutcomeProto::Query(outcome) => Ok(FunctionOutcome::Query(UdfOutcome::from_proto( outcome, - path_and_args, + path_and_args.context("Missing path and args")?, identity, )?)), - OutcomeProto::Mutation(outcome) => Ok(FunctionOutcome::Mutation( - UdfOutcome::from_proto(outcome, path_and_args, identity)?, - )), - OutcomeProto::Action(outcome) => Ok(FunctionOutcome::Action( - ActionOutcome::from_proto(outcome, path_and_args, identity)?, - )), + OutcomeProto::Mutation(outcome) => { + Ok(FunctionOutcome::Mutation(UdfOutcome::from_proto( + outcome, + path_and_args.context("Missing path and args")?, + identity, + )?)) + }, + OutcomeProto::Action(outcome) => { + Ok(FunctionOutcome::Action(ActionOutcome::from_proto( + outcome, + path_and_args.context("Missing path and args")?, + identity, + )?)) + }, + OutcomeProto::HttpAction(outcome) => { + let (http_path, http_request) = http_metadata.context("Missing http metadata")?; + Ok(FunctionOutcome::HttpAction(HttpActionOutcome::from_proto( + outcome, + http_request, + http_path.npm_version().clone(), + identity, + )?)) + }, } } } @@ -60,9 +81,7 @@ impl TryFrom for FunctionOutcomeProto { FunctionOutcome::Query(outcome) => OutcomeProto::Query(outcome.try_into()?), FunctionOutcome::Mutation(outcome) => OutcomeProto::Mutation(outcome.try_into()?), FunctionOutcome::Action(outcome) => OutcomeProto::Action(outcome.try_into()?), - FunctionOutcome::HttpAction(_) => { - anyhow::bail!("Funrun does not support http actions") - }, + FunctionOutcome::HttpAction(outcome) => OutcomeProto::HttpAction(outcome.try_into()?), }; Ok(FunctionOutcomeProto { outcome: Some(outcome), diff --git a/crates/isolate/src/http_action.rs b/crates/isolate/src/http_action.rs index 0308e5b0..748f8ac5 100644 --- a/crates/isolate/src/http_action.rs +++ b/crates/isolate/src/http_action.rs @@ -1,9 +1,12 @@ use core::fmt; use bytes::Bytes; -use common::types::{ - HttpActionRoute, - RoutableMethod, +use common::{ + http::normalize_header_map, + types::{ + HttpActionRoute, + RoutableMethod, + }, }; use futures::{ channel::mpsc, @@ -18,6 +21,7 @@ use http::{ Method, StatusCode, }; +use pb::common::HttpHeader; use serde_json::Value as JsonValue; use url::Url; use value::sha256::{ @@ -67,6 +71,30 @@ impl HttpActionRequestHead { } } +impl TryFrom for HttpActionRequestHead { + type Error = anyhow::Error; + + fn try_from( + pb::common::HttpActionRequestHead { + http_headers, + url, + method, + }: pb::common::HttpActionRequestHead, + ) -> Result { + let headers = http_headers + .into_iter() + .map(TryInto::try_into) + .collect::>()?; + let url = Url::parse(&url)?; + let method = method.parse()?; + Ok(Self { + headers, + url, + method, + }) + } +} + #[cfg(any(test, feature = "testing"))] impl proptest::arbitrary::Arbitrary for HttpActionRequest { type Parameters = (); @@ -81,13 +109,12 @@ impl proptest::arbitrary::Arbitrary for HttpActionRequest { use proptest::prelude::*; use proptest_http::{ ArbitraryHeaderMap, - ArbitraryMethod, ArbitraryUri, }; prop_compose! { fn inner()( ArbitraryHeaderMap(headers) in any::(), - ArbitraryMethod(method) in any::(), + method in any::(), ArbitraryUri(uri) in any::(), body in any::>>()) -> anyhow::Result { let origin: String = "http://example-deployment.convex.site/".to_string(); @@ -96,7 +123,7 @@ impl proptest::arbitrary::Arbitrary for HttpActionRequest { Ok(HttpActionRequest { head: HttpActionRequestHead { headers, - method, + method: method.to_string().parse()?, url, }, body: body.map(|body| stream::once(async move { Ok(body.into())}).boxed()) @@ -190,6 +217,37 @@ pub struct HttpActionResponseHead { pub headers: HeaderMap, } +impl TryFrom for HttpActionResponseHead { + type Error = anyhow::Error; + + fn try_from( + pb::common::HttpActionResponseHead { + status, + http_headers, + }: pb::common::HttpActionResponseHead, + ) -> Result { + let status = StatusCode::from_u16(status as u16)?; + let headers = http_headers + .into_iter() + .map(TryInto::try_into) + .collect::>()?; + Ok(Self { status, headers }) + } +} + +impl From for pb::common::HttpActionResponseHead { + fn from(HttpActionResponseHead { status, headers }: HttpActionResponseHead) -> Self { + let status = u16::from(status) as u32; + let http_headers = normalize_header_map(headers) + .map(HttpHeader::from) + .collect(); + Self { + status, + http_headers, + } + } +} + #[derive(Debug, Clone)] pub struct HttpActionResponseStreamer { head: Option, diff --git a/crates/pb/Cargo.toml b/crates/pb/Cargo.toml index ee581a1b..37244720 100644 --- a/crates/pb/Cargo.toml +++ b/crates/pb/Cargo.toml @@ -11,6 +11,7 @@ doctest = false anyhow = { workspace = true } convex_sync_types = { path = "../convex/sync_types" } errors = { path = "../errors" } +http = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } tonic = { workspace = true } diff --git a/crates/pb/protos/common.proto b/crates/pb/protos/common.proto index d479544b..ff0dc54c 100644 --- a/crates/pb/protos/common.proto +++ b/crates/pb/protos/common.proto @@ -161,3 +161,27 @@ message Interval { google.protobuf.Empty after_all = 3; } } + +message HttpActionRequestHead { + repeated HttpHeader http_headers = 1; + string url = 2; + string method = 3; +} + +message HttpActionResponseHead { + uint32 status = 1; + repeated HttpHeader http_headers = 2; +} + +message HttpHeader { + string key = 1; + // HTTP header values can contain non-ASCII bytes. + bytes value = 2; +} + +message HttpActionResponse { + oneof message_type { + HttpActionResponseHead head = 1; + bytes body = 2; + } +} diff --git a/crates/pb/protos/outcome.proto b/crates/pb/protos/outcome.proto index e4cb2927..ee39039d 100644 --- a/crates/pb/protos/outcome.proto +++ b/crates/pb/protos/outcome.proto @@ -12,6 +12,7 @@ message FunctionOutcome { UdfOutcome query = 1; UdfOutcome mutation = 2; ActionOutcome action = 3; + HttpActionOutcome http_action = 4; } } @@ -40,6 +41,16 @@ message ActionOutcome { SyscallTrace syscall_trace = 8; } + +message HttpActionOutcome { + google.protobuf.Timestamp unix_timestamp = 1; + + common.FunctionResult result = 2; + SyscallTrace syscall_trace = 3; + + uint64 memory_in_mb = 4; +} + message SyscallTrace { map async_syscalls = 1; } diff --git a/crates/pb/src/http.rs b/crates/pb/src/http.rs new file mode 100644 index 00000000..1b696b9e --- /dev/null +++ b/crates/pb/src/http.rs @@ -0,0 +1,23 @@ +use http::{ + HeaderName, + HeaderValue, +}; + +use crate::common::HttpHeader; + +impl TryFrom for (HeaderName, HeaderValue) { + type Error = anyhow::Error; + + fn try_from(HttpHeader { key, value }: HttpHeader) -> Result { + Ok((HeaderName::try_from(key)?, HeaderValue::from_bytes(&value)?)) + } +} + +impl From<(HeaderName, HeaderValue)> for HttpHeader { + fn from((key, value): (HeaderName, HeaderValue)) -> Self { + HttpHeader { + key: key.to_string(), + value: value.as_bytes().to_owned(), + } + } +} diff --git a/crates/pb/src/lib.rs b/crates/pb/src/lib.rs index 6846af5e..fe47d0cc 100644 --- a/crates/pb/src/lib.rs +++ b/crates/pb/src/lib.rs @@ -4,6 +4,7 @@ pub mod authentication_token; pub mod document_id; pub mod error_metadata; pub mod field_path; +pub mod http; pub mod user_identity_attributes; pub mod common { include!(concat!(env!("OUT_DIR"), "/common.rs"));