From a01d0101d45fdce47daa57130a4568fec51cdaa8 Mon Sep 17 00:00:00 2001 From: Sujay Jayakar Date: Sun, 4 Aug 2024 16:13:09 -0400 Subject: [PATCH] Resubmit export PRs (#28698) GitOrigin-RevId: 6f5d9a5e47e4aa4552ccb4fd1a170f43ba118a09 --- crates/application/src/api.rs | 151 ++++++++++++- .../src/application_function_runner/mod.rs | 6 +- crates/application/src/lib.rs | 2 +- crates/application/src/tests/cron_jobs.rs | 2 +- .../application/src/tests/scheduled_jobs.rs | 2 +- .../common/src/components/function_paths.rs | 34 +++ crates/common/src/components/mod.rs | 1 + .../src/environment/helpers/validation.rs | 4 +- crates/isolate/src/helpers.rs | 9 +- crates/isolate/src/test_helpers.rs | 6 +- crates/local_backend/src/parse.rs | 11 + crates/local_backend/src/public_api.rs | 52 ++--- crates/model/src/components/type_checking.rs | 69 +++--- crates/sync/src/worker.rs | 172 +++++++++------ .../src/server/components/definition.ts | 2 +- .../convex/src/server/components/index.ts | 206 ++++++++++++++---- 16 files changed, 525 insertions(+), 204 deletions(-) diff --git a/crates/application/src/api.rs b/crates/application/src/api.rs index 37ccaa63..2ddfd5b4 100644 --- a/crates/application/src/api.rs +++ b/crates/application/src/api.rs @@ -9,6 +9,8 @@ use common::{ components::{ CanonicalizedComponentFunctionPath, ComponentId, + ComponentPath, + ExportPath, }, http::ResolvedHost, pause::PauseClient, @@ -96,7 +98,25 @@ pub trait ApplicationApi: Send + Sync { auth_token: AuthenticationToken, ) -> anyhow::Result; + /// Execute a public query on the root app. This method is used by the sync + /// worker and HTTP API for the majority of traffic as the main entry point + /// for queries. async fn execute_public_query( + &self, + host: &ResolvedHost, + request_id: RequestId, + identity: Identity, + path: ExportPath, + args: Vec, + caller: FunctionCaller, + ts: ExecuteQueryTimestamp, + journal: Option, + ) -> anyhow::Result; + + /// Execute an admin query for a particular component. This method is used + /// by the sync worker for running queries for the dashboard and only works + /// for admin or system identity. + async fn execute_admin_query( &self, host: &ResolvedHost, request_id: RequestId, @@ -108,19 +128,21 @@ pub trait ApplicationApi: Send + Sync { journal: Option, ) -> anyhow::Result; + /// Execute a public mutation on the root app. async fn execute_public_mutation( &self, host: &ResolvedHost, request_id: RequestId, identity: Identity, - path: CanonicalizedComponentFunctionPath, + path: ExportPath, args: Vec, caller: FunctionCaller, // Identifier used to make this mutation idempotent. mutation_identifier: Option, ) -> anyhow::Result>; - async fn execute_public_action( + /// Execute an admin mutation for a particular component for the dashboard. + async fn execute_admin_mutation( &self, host: &ResolvedHost, request_id: RequestId, @@ -128,24 +150,32 @@ pub trait ApplicationApi: Send + Sync { path: CanonicalizedComponentFunctionPath, args: Vec, caller: FunctionCaller, - ) -> anyhow::Result>; + mutation_identifier: Option, + ) -> anyhow::Result>; - async fn execute_any_function( + /// Execute a public action on the root app. + async fn execute_public_action( &self, host: &ResolvedHost, request_id: RequestId, identity: Identity, - path: CanonicalizedComponentFunctionPath, + path: ExportPath, args: Vec, caller: FunctionCaller, - ) -> anyhow::Result>; + ) -> anyhow::Result>; - async fn latest_timestamp( + /// Execute an admin action for a particular component for the dashboard. + async fn execute_admin_action( &self, host: &ResolvedHost, request_id: RequestId, - ) -> anyhow::Result; + identity: Identity, + path: CanonicalizedComponentFunctionPath, + args: Vec, + caller: FunctionCaller, + ) -> anyhow::Result>; + /// Execute an HTTP action on the root app. async fn execute_http_action( &self, host: &ResolvedHost, @@ -156,6 +186,25 @@ pub trait ApplicationApi: Send + Sync { response_streamer: HttpActionResponseStreamer, ) -> anyhow::Result<()>; + /// For the dashboard (and the CLI), run any function in any component + /// without knowing its type. This function requires admin identity for + /// calling functions outside the root component. + async fn execute_any_function( + &self, + host: &ResolvedHost, + request_id: RequestId, + identity: Identity, + path: CanonicalizedComponentFunctionPath, + args: Vec, + caller: FunctionCaller, + ) -> anyhow::Result>; + + async fn latest_timestamp( + &self, + host: &ResolvedHost, + request_id: RequestId, + ) -> anyhow::Result; + async fn check_store_file_authorization( &self, host: &ResolvedHost, @@ -226,7 +275,7 @@ impl ApplicationApi for Application { _host: &ResolvedHost, request_id: RequestId, identity: Identity, - path: CanonicalizedComponentFunctionPath, + path: ExportPath, args: Vec, caller: FunctionCaller, ts: ExecuteQueryTimestamp, @@ -236,7 +285,34 @@ impl ApplicationApi for Application { caller.allowed_visibility() == AllowedVisibility::PublicOnly, "This method should not be used by internal callers." ); + let ts = match ts { + ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(), + ExecuteQueryTimestamp::At(ts) => ts, + }; + // Public queries always start with the root component. + let path = CanonicalizedComponentFunctionPath { + component: ComponentPath::root(), + udf_path: path.into(), + }; + self.read_only_udf_at_ts(request_id, path, args, identity, ts, journal, caller) + .await + } + async fn execute_admin_query( + &self, + _host: &ResolvedHost, + request_id: RequestId, + identity: Identity, + path: CanonicalizedComponentFunctionPath, + args: Vec, + caller: FunctionCaller, + ts: ExecuteQueryTimestamp, + journal: Option, + ) -> anyhow::Result { + anyhow::ensure!( + path.component.is_root() || identity.is_admin() || identity.is_system(), + "Only admin or system users can call functions on non-root components directly" + ); let ts = match ts { ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(), ExecuteQueryTimestamp::At(ts) => ts, @@ -250,7 +326,7 @@ impl ApplicationApi for Application { _host: &ResolvedHost, request_id: RequestId, identity: Identity, - path: CanonicalizedComponentFunctionPath, + path: ExportPath, args: Vec, caller: FunctionCaller, // Identifier used to make this mutation idempotent. @@ -260,7 +336,36 @@ impl ApplicationApi for Application { caller.allowed_visibility() == AllowedVisibility::PublicOnly, "This method should not be used by internal callers." ); + let path = CanonicalizedComponentFunctionPath { + component: ComponentPath::root(), + udf_path: path.into(), + }; + self.mutation_udf( + request_id, + path, + args, + identity, + mutation_identifier, + caller, + PauseClient::new(), + ) + .await + } + async fn execute_admin_mutation( + &self, + _host: &ResolvedHost, + request_id: RequestId, + identity: Identity, + path: CanonicalizedComponentFunctionPath, + args: Vec, + caller: FunctionCaller, + mutation_identifier: Option, + ) -> anyhow::Result> { + anyhow::ensure!( + path.component.is_root() || identity.is_admin() || identity.is_system(), + "Only admin or system users can call functions on non-root components directly" + ); self.mutation_udf( request_id, path, @@ -278,7 +383,7 @@ impl ApplicationApi for Application { _host: &ResolvedHost, request_id: RequestId, identity: Identity, - path: CanonicalizedComponentFunctionPath, + path: ExportPath, args: Vec, caller: FunctionCaller, ) -> anyhow::Result> { @@ -286,7 +391,27 @@ impl ApplicationApi for Application { caller.allowed_visibility() == AllowedVisibility::PublicOnly, "This method should not be used by internal callers." ); + let path = CanonicalizedComponentFunctionPath { + component: ComponentPath::root(), + udf_path: path.into(), + }; + self.action_udf(request_id, path, args, identity, caller) + .await + } + async fn execute_admin_action( + &self, + _host: &ResolvedHost, + request_id: RequestId, + identity: Identity, + path: CanonicalizedComponentFunctionPath, + args: Vec, + caller: FunctionCaller, + ) -> anyhow::Result> { + anyhow::ensure!( + path.component.is_root() || identity.is_admin() || identity.is_system(), + "Only admin or system users can call functions on non-root components directly" + ); self.action_udf(request_id, path, args, identity, caller) .await } @@ -300,6 +425,10 @@ impl ApplicationApi for Application { args: Vec, caller: FunctionCaller, ) -> anyhow::Result> { + anyhow::ensure!( + path.component.is_root() || identity.is_admin() || identity.is_system(), + "Only admin or system users can call functions on non-root components directly" + ); self.any_udf(request_id, path, args, identity, caller).await } diff --git a/crates/application/src/application_function_runner/mod.rs b/crates/application/src/application_function_runner/mod.rs index 47b7fa77..3f93a20d 100644 --- a/crates/application/src/application_function_runner/mod.rs +++ b/crates/application/src/application_function_runner/mod.rs @@ -762,7 +762,7 @@ impl ApplicationFunctionRunner { if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("mutation")); } - let arguments = match parse_udf_args(&path, arguments) { + let arguments = match parse_udf_args(&path.udf_path, arguments) { Ok(arguments) => arguments, Err(error) => { return Ok(Err(MutationError { @@ -1032,7 +1032,7 @@ impl ApplicationFunctionRunner { if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("action")); } - let arguments = match parse_udf_args(&path, arguments) { + let arguments = match parse_udf_args(&path.udf_path, arguments) { Ok(arguments) => arguments, Err(error) => { return Ok(Err(ActionError { @@ -1673,7 +1673,7 @@ impl ApplicationFunctionRunner { if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("query")); } - let args = match parse_udf_args(&path, args) { + let args = match parse_udf_args(&path.udf_path, args) { Ok(arguments) => arguments, Err(js_error) => { return Ok(QueryReturn { diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 636718f7..901fb01c 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -2575,7 +2575,7 @@ impl Application { component: ComponentPath::TODO(), udf_path: CanonicalizedUdfPath::new(module_path, function_name), }; - let arguments = parse_udf_args(&path, args)?; + let arguments = parse_udf_args(&path.udf_path, args)?; let (result, log_lines) = match analyzed_function.udf_type { UdfType::Query => self .runner diff --git a/crates/application/src/tests/cron_jobs.rs b/crates/application/src/tests/cron_jobs.rs index 6ddab827..08f4f4c4 100644 --- a/crates/application/src/tests/cron_jobs.rs +++ b/crates/application/src/tests/cron_jobs.rs @@ -78,7 +78,7 @@ async fn create_cron_job( }; let cron_spec = CronSpec { udf_path: path.udf_path.clone(), - udf_args: parse_udf_args(&path, vec![JsonValue::Object(map)])?, + udf_args: parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?, cron_schedule: CronSchedule::Interval { seconds: 60 }, }; let original_jobs = cron_model.list().await?; diff --git a/crates/application/src/tests/scheduled_jobs.rs b/crates/application/src/tests/scheduled_jobs.rs index 4241e4d0..871f9128 100644 --- a/crates/application/src/tests/scheduled_jobs.rs +++ b/crates/application/src/tests/scheduled_jobs.rs @@ -81,7 +81,7 @@ async fn create_scheduled_job<'a>( let job_id = model .schedule( path.clone(), - parse_udf_args(&path, vec![JsonValue::Object(map)])?, + parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?, rt.unix_timestamp(), ExecutionContext::new_for_test(), ) diff --git a/crates/common/src/components/function_paths.rs b/crates/common/src/components/function_paths.rs index a1d88c7a..c9d787c7 100644 --- a/crates/common/src/components/function_paths.rs +++ b/crates/common/src/components/function_paths.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use serde::{ Deserialize, Serialize, @@ -113,3 +115,35 @@ impl HeapSize for CanonicalizedComponentFunctionPath { self.component.heap_size() + self.udf_path.heap_size() } } + +pub struct ExportPath { + path: CanonicalizedUdfPath, +} + +impl From for ExportPath { + fn from(path: CanonicalizedUdfPath) -> Self { + Self { path } + } +} + +impl From for CanonicalizedUdfPath { + fn from(p: ExportPath) -> Self { + p.path + } +} + +impl FromStr for ExportPath { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + Ok(Self { + path: CanonicalizedUdfPath::from_str(s)?, + }) + } +} + +impl From for String { + fn from(p: ExportPath) -> Self { + p.path.to_string() + } +} diff --git a/crates/common/src/components/mod.rs b/crates/common/src/components/mod.rs index 47917a78..551d73fd 100644 --- a/crates/common/src/components/mod.rs +++ b/crates/common/src/components/mod.rs @@ -22,6 +22,7 @@ pub use self::{ CanonicalizedComponentFunctionPath, ComponentDefinitionFunctionPath, ComponentFunctionPath, + ExportPath, }, module_paths::CanonicalizedComponentModulePath, reference::Reference, diff --git a/crates/isolate/src/environment/helpers/validation.rs b/crates/isolate/src/environment/helpers/validation.rs index bee01281..e279cbe7 100644 --- a/crates/isolate/src/environment/helpers/validation.rs +++ b/crates/isolate/src/environment/helpers/validation.rs @@ -93,7 +93,7 @@ pub async fn validate_schedule_args( } // We do serialize the arguments, so this is likely our fault. - let udf_args = parse_udf_args(&path, udf_args)?; + let udf_args = parse_udf_args(&path.udf_path, udf_args)?; // Even though we might use different version of modules when executing, // we do validate that the scheduled function exists at time of scheduling. @@ -435,7 +435,7 @@ impl ValidatedPathAndArgs { )))); } - match validate_udf_args_size(&path, &args) { + match validate_udf_args_size(&path.udf_path, &args) { Ok(()) => (), Err(err) => return Ok(Err(err)), } diff --git a/crates/isolate/src/helpers.rs b/crates/isolate/src/helpers.rs index 0ff23119..f596f235 100644 --- a/crates/isolate/src/helpers.rs +++ b/crates/isolate/src/helpers.rs @@ -22,6 +22,7 @@ use humansize::{ }; use serde::Deserialize; use serde_json::Value as JsonValue; +use sync_types::CanonicalizedUdfPath; use value::Size; use crate::strings; @@ -104,7 +105,7 @@ pub fn serialize_udf_args(args: ConvexArray) -> anyhow::Result { } pub fn parse_udf_args( - path: &CanonicalizedComponentFunctionPath, + path: &CanonicalizedUdfPath, args: Vec, ) -> Result { args.into_iter() @@ -114,19 +115,19 @@ pub fn parse_udf_args( .map_err(|err| { JsError::from_message(format!( "Invalid arguments for {}: {err}", - String::from(path.udf_path.clone()), + String::from(path.clone()), )) }) } pub fn validate_udf_args_size( - path: &CanonicalizedComponentFunctionPath, + path: &CanonicalizedUdfPath, args: &ConvexArray, ) -> Result<(), JsError> { if args.size() > *FUNCTION_MAX_ARGS_SIZE { return Err(JsError::from_message(format!( "Arguments for {} are too large (actual: {}, limit: {})", - path.udf_path.clone(), + path.clone(), args.size().format_size(BINARY), (*FUNCTION_MAX_ARGS_SIZE).format_size(BINARY), ))); diff --git a/crates/isolate/src/test_helpers.rs b/crates/isolate/src/test_helpers.rs index 8cf7bc10..284ff3f9 100644 --- a/crates/isolate/src/test_helpers.rs +++ b/crates/isolate/src/test_helpers.rs @@ -1223,7 +1223,7 @@ impl ActionCallbacks for UdfTest { args: Vec, _context: ExecutionContext, ) -> anyhow::Result { - let arguments = parse_udf_args(&path, args)?; + let arguments = parse_udf_args(&path.udf_path, args)?; let str_name = String::from(path.udf_path); let outcome = self .raw_query(&str_name, arguments.into(), identity, None) @@ -1243,7 +1243,7 @@ impl ActionCallbacks for UdfTest { args: Vec, _context: ExecutionContext, ) -> anyhow::Result { - let arguments = parse_udf_args(&path, args)?; + let arguments = parse_udf_args(&path.udf_path, args)?; let str_name = String::from(path.udf_path); let outcome = self .raw_mutation(&str_name, arguments.into(), identity) @@ -1263,7 +1263,7 @@ impl ActionCallbacks for UdfTest { args: Vec, _context: ExecutionContext, ) -> anyhow::Result { - let arguments = parse_udf_args(&path, args)?; + let arguments = parse_udf_args(&path.udf_path, args)?; let str_name = String::from(path.udf_path); let (outcome, _) = self .raw_action(&str_name, arguments.into(), identity) diff --git a/crates/local_backend/src/parse.rs b/crates/local_backend/src/parse.rs index 003cba85..0993c93c 100644 --- a/crates/local_backend/src/parse.rs +++ b/crates/local_backend/src/parse.rs @@ -1,3 +1,4 @@ +use common::components::ExportPath; use errors::ErrorMetadata; use sync_types::CanonicalizedUdfPath; use value::{ @@ -7,6 +8,16 @@ use value::{ TableName, }; +pub fn parse_export_path(path: &str) -> anyhow::Result { + path.parse().map_err(|e: anyhow::Error| { + let msg = format!("{path} is not a valid path to a Convex function. {e}"); + e.context(ErrorMetadata::bad_request( + "BadConvexFunctionIdentifier", + msg, + )) + }) +} + pub fn parse_udf_path(path: &str) -> anyhow::Result { path.parse().map_err(|e: anyhow::Error| { let msg = format!("{path} is not a valid path to a Convex function. {e}"); diff --git a/crates/local_backend/src/public_api.rs b/crates/local_backend/src/public_api.rs index ba83f5c1..d5bc0bba 100644 --- a/crates/local_backend/src/public_api.rs +++ b/crates/local_backend/src/public_api.rs @@ -10,10 +10,7 @@ use axum::{ response::IntoResponse, }; use common::{ - components::{ - CanonicalizedComponentFunctionPath, - ComponentPath, - }, + components::CanonicalizedComponentFunctionPath, http::{ extract::{ Json, @@ -43,7 +40,10 @@ use value::{ use crate::{ args_structs::UdfPostRequestWithComponent, authentication::ExtractAuthenticationToken, - parse::parse_udf_path, + parse::{ + parse_export_path, + parse_udf_path, + }, RouterState, }; @@ -259,7 +259,7 @@ pub async fn public_query_get( ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, ExtractClientVersion(client_version): ExtractClientVersion, ) -> Result { - let udf_path = parse_udf_path(&req.path)?; + let export_path = parse_export_path(&req.path)?; let args = req.args.into_arg_vec(); let journal = None; // NOTE: We could coalesce authenticating and executing the query into one @@ -276,10 +276,7 @@ pub async fn public_query_get( &host, request_id, identity, - CanonicalizedComponentFunctionPath { - component: ComponentPath::TODO(), - udf_path, - }, + export_path, args, FunctionCaller::HttpApi(client_version.clone()), ExecuteQueryTimestamp::Latest, @@ -307,7 +304,7 @@ pub async fn public_query_post( ExtractClientVersion(client_version): ExtractClientVersion, Json(req): Json, ) -> Result { - let udf_path = parse_udf_path(&req.path)?; + let udf_path = parse_export_path(&req.path)?; let journal = None; // NOTE: We could coalesce authenticating and executing the query into one // rpc but we keep things simple by reusing the same method as the sync worker. @@ -323,10 +320,7 @@ pub async fn public_query_post( &host, request_id, identity, - CanonicalizedComponentFunctionPath { - component: ComponentPath::TODO(), - udf_path, - }, + udf_path, req.args.into_arg_vec(), FunctionCaller::HttpApi(client_version.clone()), ExecuteQueryTimestamp::Latest, @@ -364,7 +358,7 @@ pub async fn public_query_at_ts_post( ExtractClientVersion(client_version): ExtractClientVersion, Json(req): Json, ) -> Result { - let udf_path = parse_udf_path(&req.path)?; + let export_path = parse_export_path(&req.path)?; let journal = None; // NOTE: We could coalesce authenticating and executing the query into one // rpc but we keep things simple by reusing the same method as the sync worker. @@ -381,10 +375,7 @@ pub async fn public_query_at_ts_post( &host, request_id, identity, - CanonicalizedComponentFunctionPath { - component: ComponentPath::root(), - udf_path, - }, + export_path, req.args.into_arg_vec(), FunctionCaller::HttpApi(client_version.clone()), ExecuteQueryTimestamp::At(ts), @@ -431,17 +422,14 @@ pub async fn public_query_batch_post( .await?; for req in req_batch.queries { let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?; - let udf_path = parse_udf_path(&req.path)?; + let export_path = parse_export_path(&req.path)?; let udf_return = st .api .execute_public_query( &host, request_id.clone(), identity.clone(), - CanonicalizedComponentFunctionPath { - component: ComponentPath::TODO(), - udf_path, - }, + export_path, req.args.into_arg_vec(), FunctionCaller::HttpApi(client_version.clone()), ExecuteQueryTimestamp::At(*ts), @@ -474,7 +462,7 @@ pub async fn public_mutation_post( ExtractClientVersion(client_version): ExtractClientVersion, Json(req): Json, ) -> Result { - let udf_path = parse_udf_path(&req.path)?; + let export_path = parse_export_path(&req.path)?; // NOTE: We could coalesce authenticating and executing the query into one // rpc but we keep things simple by reusing the same method as the sync worker. // Round trip latency between Usher and Backend is much smaller than between @@ -489,10 +477,7 @@ pub async fn public_mutation_post( &host, request_id, identity, - CanonicalizedComponentFunctionPath { - component: ComponentPath::TODO(), - udf_path, - }, + export_path, req.args.into_arg_vec(), FunctionCaller::HttpApi(client_version.clone()), None, @@ -523,7 +508,7 @@ pub async fn public_action_post( ExtractClientVersion(client_version): ExtractClientVersion, Json(req): Json, ) -> Result { - let udf_path = parse_udf_path(&req.path)?; + let export_path = parse_export_path(&req.path)?; // NOTE: We could coalesce authenticating and executing the query into one // rpc but we keep things simple by reusing the same method as the sync worker. @@ -539,10 +524,7 @@ pub async fn public_action_post( &host, request_id, identity, - CanonicalizedComponentFunctionPath { - component: ComponentPath::TODO(), - udf_path, - }, + export_path, req.args.into_arg_vec(), FunctionCaller::HttpApi(client_version.clone()), ) diff --git a/crates/model/src/components/type_checking.rs b/crates/model/src/components/type_checking.rs index 3381853f..76129f11 100644 --- a/crates/model/src/components/type_checking.rs +++ b/crates/model/src/components/type_checking.rs @@ -43,12 +43,12 @@ pub struct CheckedComponent { pub args: BTreeMap, pub child_components: BTreeMap, pub http_routes: CheckedHttpRoutes, - pub exports: BTreeMap, + pub exports: BTreeMap, } -#[derive(Debug)] -pub enum CheckedExport { - Branch(BTreeMap), +#[derive(Clone, Debug)] +pub enum ResourceTree { + Branch(BTreeMap), Leaf(Resource), } @@ -283,24 +283,21 @@ impl<'a> CheckedComponentBuilder<'a> { fn resolve_exports( &self, exports: &BTreeMap, - ) -> Result, TypecheckError> { + ) -> Result, TypecheckError> { let mut result = BTreeMap::new(); for (name, export) in exports { let node = match export { ComponentExport::Branch(ref exports) => { - CheckedExport::Branch(self.resolve_exports(exports)?) - }, - ComponentExport::Leaf(ref reference) => { - let resource = self.resolve(reference)?; - CheckedExport::Leaf(resource) + ResourceTree::Branch(self.resolve_exports(exports)?) }, + ComponentExport::Leaf(ref reference) => self.resolve(reference)?, }; result.insert(name.clone(), node); } Ok(result) } - fn resolve(&self, reference: &Reference) -> Result { + fn resolve(&self, reference: &Reference) -> Result { let unresolved_err = || TypecheckError::UnresolvedExport { definition_path: self.definition_path.clone(), reference: reference.clone(), @@ -310,10 +307,12 @@ impl<'a> CheckedComponentBuilder<'a> { if attributes.len() != 1 { return Err(TypecheckError::Unsupported("Nested argument references")); } - self.args + let resource = self + .args .get(&attributes[0]) .ok_or_else(unresolved_err)? - .clone() + .clone(); + ResourceTree::Leaf(resource) }, Reference::Function(path) => { let canonicalized = path.clone(); @@ -327,10 +326,11 @@ impl<'a> CheckedComponentBuilder<'a> { .iter() .find(|f| &f.name == canonicalized.function_name()) .ok_or_else(unresolved_err)?; - Resource::Function(CanonicalizedComponentFunctionPath { + let path = CanonicalizedComponentFunctionPath { component: self.component_path.clone(), udf_path: path.clone(), - }) + }; + ResourceTree::Leaf(Resource::Function(path)) }, Reference::ChildComponent { component, @@ -358,7 +358,7 @@ impl CheckedComponent { pub fn resolve_export( &self, attributes: &[Identifier], - ) -> Result, TypecheckError> { + ) -> Result, TypecheckError> { let mut current = &self.exports; let mut attribute_iter = attributes.iter(); while let Some(attribute) = attribute_iter.next() { @@ -366,21 +366,20 @@ impl CheckedComponent { return Ok(None); }; match export { - CheckedExport::Branch(ref next) => { + ResourceTree::Branch(ref next) => { current = next; continue; }, - CheckedExport::Leaf(ref resource) => { + ResourceTree::Leaf(ref resource) => { if !attribute_iter.as_slice().is_empty() { + // TODO: Should this be a system error? return Err(TypecheckError::Unsupported("Component references")); } - return Ok(Some(resource.clone())); + return Ok(Some(ResourceTree::Leaf(resource.clone()))); }, } } - Err(TypecheckError::Unsupported( - "Intermediate export references", - )) + Ok(Some(ResourceTree::Branch(current.clone()))) } } @@ -498,8 +497,8 @@ mod json { use super::{ CheckedComponent, - CheckedExport, CheckedHttpRoutes, + ResourceTree, }; #[derive(Debug, Serialize, Deserialize)] @@ -511,7 +510,7 @@ mod json { args: BTreeMap, child_components: BTreeMap, http_routes: SerializedCheckedHttpRoutes, - exports: BTreeMap, + exports: BTreeMap, } impl TryFrom for SerializedCheckedComponent { @@ -616,50 +615,50 @@ mod json { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase", tag = "type")] - pub enum SerializedCheckedExport { + pub enum SerializedResourceTree { Branch { - children: BTreeMap, + children: BTreeMap, }, Leaf { resource: SerializedResource, }, } - impl TryFrom for SerializedCheckedExport { + impl TryFrom for SerializedResourceTree { type Error = anyhow::Error; - fn try_from(value: CheckedExport) -> Result { + fn try_from(value: ResourceTree) -> Result { Ok(match value { - CheckedExport::Branch(branch) => Self::Branch { + ResourceTree::Branch(branch) => Self::Branch { children: branch .into_iter() .map(|(k, v)| Ok((String::from(k), v.try_into()?))) .collect::>()?, }, - CheckedExport::Leaf(leaf) => Self::Leaf { + ResourceTree::Leaf(leaf) => Self::Leaf { resource: leaf.try_into()?, }, }) } } - impl TryFrom for CheckedExport { + impl TryFrom for ResourceTree { type Error = anyhow::Error; - fn try_from(value: SerializedCheckedExport) -> Result { + fn try_from(value: SerializedResourceTree) -> Result { Ok(match value { - SerializedCheckedExport::Branch { children } => Self::Branch( + SerializedResourceTree::Branch { children } => Self::Branch( children .into_iter() .map(|(k, v)| Ok((k.parse()?, v.try_into()?))) .collect::>()?, ), - SerializedCheckedExport::Leaf { resource } => Self::Leaf(resource.try_into()?), + SerializedResourceTree::Leaf { resource } => Self::Leaf(resource.try_into()?), }) } } } pub use self::json::{ SerializedCheckedComponent, - SerializedCheckedExport, + SerializedResourceTree, }; diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index d04b941d..36001a15 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -31,6 +31,7 @@ use common::{ components::{ CanonicalizedComponentFunctionPath, ComponentPath, + ExportPath, }, http::ResolvedHost, knobs::SYNC_MAX_SEND_TRANSITION_COUNT, @@ -83,6 +84,7 @@ use sync_types::{ StateModification, StateVersion, Timestamp, + UdfPath, }; use crate::{ @@ -389,15 +391,20 @@ impl SyncWorker { self.state.current_version().identity } - pub fn parse_component_path( - component_path: Option<&str>, + pub fn parse_admin_component_path( + component_path: &str, + udf_path: &UdfPath, identity: &Identity, - ) -> anyhow::Result { - let path = ComponentPath::deserialize(component_path)?; + ) -> anyhow::Result { + let path = ComponentPath::deserialize(Some(component_path))?; anyhow::ensure!( path.is_root() || identity.is_admin() || identity.is_system(), "Only admin or system users can call functions on non-root components directly" ); + let path = CanonicalizedComponentFunctionPath { + component: path, + udf_path: udf_path.clone().canonicalize(), + }; Ok(path) } @@ -477,27 +484,41 @@ impl SyncWorker { let timer = mutation_queue_timer(); let api = self.api.clone(); let host = self.host.clone(); - let component = Self::parse_component_path(component_path.as_deref(), &identity)?; - let path = CanonicalizedComponentFunctionPath { - component, - udf_path: udf_path.clone().canonicalize(), - }; + let caller = FunctionCaller::SyncWorker(client_version); + let future = async move { rt.with_timeout("mutation", SYNC_WORKER_PROCESS_TIMEOUT, async move { timer.finish(); - let result = api - .execute_public_mutation( - &host, - server_request_id, - identity, - path, - args, - FunctionCaller::SyncWorker(client_version), - mutation_identifier, - ) - .in_span(root) - .await?; - + let result = match component_path { + None => { + api.execute_public_mutation( + &host, + server_request_id, + identity, + ExportPath::from(udf_path.canonicalize()), + args, + caller, + mutation_identifier, + ) + .in_span(root) + .await? + }, + Some(ref p) => { + let path = + Self::parse_admin_component_path(p, &udf_path, &identity)?; + api.execute_admin_mutation( + &host, + server_request_id, + identity, + path, + args, + caller, + mutation_identifier, + ) + .in_span(root) + .await? + }, + }; let response = match result { Ok(udf_return) => ServerMessage::MutationResponse { request_id, @@ -549,11 +570,6 @@ impl SyncWorker { Some(id) => RequestId::new_for_ws_session(id, request_id), None => RequestId::new(), }; - let component = Self::parse_component_path(component_path.as_deref(), &identity)?; - let path = CanonicalizedComponentFunctionPath { - component, - udf_path: udf_path.clone().canonicalize(), - }; let root = self.rt.with_rng(|rng| { get_sampled_span( "sync-worker/action", @@ -565,17 +581,34 @@ impl SyncWorker { ) }); let future = async move { - let result = api - .execute_public_action( - &host, - server_request_id, - identity, - path, - args, - FunctionCaller::SyncWorker(client_version), - ) - .in_span(root) - .await?; + let caller = FunctionCaller::SyncWorker(client_version); + let result = match component_path { + None => { + api.execute_public_action( + &host, + server_request_id, + identity, + ExportPath::from(udf_path.canonicalize()), + args, + caller, + ) + .in_span(root) + .await? + }, + Some(ref p) => { + let path = Self::parse_admin_component_path(p, &udf_path, &identity)?; + api.execute_admin_action( + &host, + server_request_id, + identity, + path, + args, + caller, + ) + .in_span(root) + .await? + }, + }; let response = match result { Ok(udf_return) => ServerMessage::ActionResponse { request_id, @@ -721,29 +754,46 @@ impl SyncWorker { None => { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. - let component = Self::parse_component_path( - query.component_path.as_deref(), - &identity_, - )?; - let udf_return = api - .execute_public_query( - // This query run might have been triggered due to invalidation - // of a subscription. The sync worker is effectively the owner of - // the query so we do not want to re-use the original query request - // id. - &host, - RequestId::new(), - identity_, - CanonicalizedComponentFunctionPath { - component, - udf_path: query.udf_path.canonicalize(), - }, - query.args, - FunctionCaller::SyncWorker(client_version), - ExecuteQueryTimestamp::At(new_ts), - query.journal, - ) - .await?; + let caller = FunctionCaller::SyncWorker(client_version); + let ts = ExecuteQueryTimestamp::At(new_ts); + + // This query run might have been triggered due to invalidation + // of a subscription. The sync worker is effectively the owner + // of the query so we do not want to re-use the original query request id. + let request_id = RequestId::new(); + let udf_return = match query.component_path { + None => { + api.execute_public_query( + &host, + request_id, + identity_, + ExportPath::from(query.udf_path.canonicalize()), + query.args, + caller, + ts, + query.journal, + ) + .await? + }, + Some(ref p) => { + let path = Self::parse_admin_component_path( + p, + &query.udf_path, + &identity_, + )?; + api.execute_admin_query( + &host, + request_id, + identity_, + path, + query.args, + caller, + ts, + query.journal, + ) + .await? + }, + }; let subscription = subscriptions_client.subscribe(udf_return.token).await?; ( QueryResult::Rerun { diff --git a/npm-packages/convex/src/server/components/definition.ts b/npm-packages/convex/src/server/components/definition.ts index 9157369c..cb7be535 100644 --- a/npm-packages/convex/src/server/components/definition.ts +++ b/npm-packages/convex/src/server/components/definition.ts @@ -31,7 +31,7 @@ type ComponentInstantiation = { export type HttpMount = string; type ComponentExport = - | { type: "branch"; branch: ComponentExport[] } + | { type: "branch"; branch: [string, ComponentExport][] } | { type: "leaf"; leaf: string }; // The type expected from the internal .export() diff --git a/npm-packages/convex/src/server/components/index.ts b/npm-packages/convex/src/server/components/index.ts index 33e0af3b..063b72e0 100644 --- a/npm-packages/convex/src/server/components/index.ts +++ b/npm-packages/convex/src/server/components/index.ts @@ -61,42 +61,60 @@ export async function createFunctionHandle< return await performAsyncSyscall("1.0/createFunctionHandle", { udfPath }); } +interface ComponentExports { + [key: string]: FunctionReference | ComponentExports; +} + /** * An object of this type should be the default export of a * component.config.ts file in a component definition directory. * * @internal */ // eslint-disable-next-line @typescript-eslint/ban-types -export type ComponentDefinition = - { - /** - * Install a component with the given definition in this component definition. - * - * Takes a component definition, an optional name, and the args it requires. - * - * For editor tooling this method expects a {@link ComponentDefinition} - * but at runtime the object that is imported will be a {@link ImportedComponentDefinition} - */ - install>( - definition: Definition, - options: { - name?: string; - // TODO we have to do the "arguments are optional if empty, otherwise required" - args?: ObjectType>; - }, - ): InstalledComponent; - - /** - * Mount a component's HTTP router at a given path prefix. - */ - mountHttp(pathPrefix: string, component: InstalledComponent): void; - - // TODO this will be needed once components are responsible for building interfaces for themselves - /** - * @internal - */ - __args: Args; - }; +export type ComponentDefinition< + Args extends PropertyValidators = EmptyObject, + Exports extends ComponentExports = any, +> = { + /** + * Install a component with the given definition in this component definition. + * + * Takes a component definition, an optional name, and the args it requires. + * + * For editor tooling this method expects a {@link ComponentDefinition} + * but at runtime the object that is imported will be a {@link ImportedComponentDefinition} + */ + install>( + definition: Definition, + options: { + name?: string; + // TODO we have to do the "arguments are optional if empty, otherwise required" + args?: ObjectType>; + }, + ): InstalledComponent; + + mount(exports: ComponentExports): void; + + /** + * Mount a component's HTTP router at a given path prefix. + */ + mountHttp(pathPrefix: string, component: InstalledComponent): void; + + // TODO this will be needed once components are responsible for building interfaces for themselves + /** + * @internal + */ + __args: Args; + + /** + * @internal + */ + __exports: Exports; +}; + +type ComponentDefinitionArgs> = + T["__args"]; +type ComponentDefinitionExports> = + T["__exports"]; /** * An object of this type should be the default export of a @@ -113,20 +131,27 @@ export type AppDefinition = { * For editor tooling this method expects a {@link ComponentDefinition} * but at runtime the object that is imported will be a {@link ImportedComponentDefinition} */ - install>( + install>( definition: Definition, options: { name?: string; - args?: ObjectType>; + args?: ObjectType>; }, ): InstalledComponent; + mount(exports: ComponentExports): void; + /** * Mount a component's HTTP router at a given path prefix. */ mountHttp(pathPrefix: string, component: InstalledComponent): void; }; +interface ExportTree { + // Tree with serialized `Reference`s as leaves. + [key: string]: string | ExportTree; +} + type CommonDefinitionData = { _isRoot: boolean; _childComponents: [ @@ -135,6 +160,7 @@ type CommonDefinitionData = { Record, ][]; _httpMounts: Record; + _exportTree: ExportTree; }; type ComponentDefinitionData = CommonDefinitionData & { @@ -143,12 +169,10 @@ type ComponentDefinitionData = CommonDefinitionData & { }; type AppDefinitionData = CommonDefinitionData; -type ExtractArgs = T extends ComponentDefinition ? P : never; - /** * Used to refer to an already-installed component. */ -class InstalledComponent> { +class InstalledComponent> { /** * @internal */ @@ -159,18 +183,45 @@ class InstalledComponent> { */ [toReferencePath]: string; - constructor(definition: Definition, name: string) { + constructor( + definition: Definition, + private _name: string, + ) { this._definition = definition; - this[toReferencePath] = `_reference/childComponent/${name}`; + this[toReferencePath] = `_reference/childComponent/${_name}`; + } + + get exports(): ComponentDefinitionExports { + return createExports(this._name, []); } } +function createExports(name: string, pathParts: string[]): any { + const handler: ProxyHandler = { + get(_, prop: string | symbol) { + if (typeof prop === "string") { + const newParts = [...pathParts, prop]; + return createExports(name, newParts); + } else if (prop === toReferencePath) { + let reference = `_reference/childComponent/${name}`; + for (const part of pathParts) { + reference += `/${part}`; + } + return reference; + } else { + return undefined; + } + }, + }; + return new Proxy({}, handler); +} + function install>( this: CommonDefinitionData, definition: Definition, options: { name?: string; - args?: Infer>; + args?: Infer>; } = {}, ): InstalledComponent { // At runtime an imported component will have this shape. @@ -193,6 +244,46 @@ function install>( return new InstalledComponent(definition, name); } +function mount(this: CommonDefinitionData, exports: any) { + function visit(definition: CommonDefinitionData, path: string[], value: any) { + const valueReference = value[toReferencePath]; + if (valueReference) { + if (!path.length) { + throw new Error("Empty export path"); + } + let current = definition._exportTree; + for (const part of path.slice(0, -1)) { + let next = current[part]; + if (typeof next === "string") { + throw new Error( + `Mount path ${path.join(".")} collides with existing export`, + ); + } + if (!next) { + next = {}; + current[part] = next; + } + current = next; + } + const last = path[path.length - 1]; + if (current[last]) { + throw new Error( + `Mount path ${path.join(".")} collides with existing export`, + ); + } + current[last] = valueReference; + } else { + for (const [key, child] of Object.entries(value)) { + visit(definition, [...path, key], child); + } + } + } + if (exports[toReferencePath]) { + throw new Error(`Cannot mount another component's exports at the root`); + } + visit(this, [], exports); +} + function mountHttp( this: CommonDefinitionData, pathPrefix: string, @@ -231,10 +322,24 @@ function exportAppForAnalysis( definitionType, childComponents: childComponents as any, httpMounts: this._httpMounts, - exports: { type: "branch", branch: [] }, + exports: serializeExportTree(this._exportTree), }; } +function serializeExportTree(tree: ExportTree): any { + const branch: any[] = []; + for (const [key, child] of Object.entries(tree)) { + let node; + if (typeof child === "string") { + node = { type: "leaf", leaf: child }; + } else { + node = serializeExportTree(child); + } + branch.push([key, node]); + } + return { type: "branch", branch }; +} + function serializeChildComponents( childComponents: [string, ImportedComponentDefinition, Record][], ): { @@ -286,18 +391,20 @@ function exportComponentForAnalysis( args, }; const childComponents = serializeChildComponents(this._childComponents); - return { name: this._name, definitionType, childComponents: childComponents as any, httpMounts: this._httpMounts, - exports: { type: "branch", branch: [] }, + exports: serializeExportTree(this._exportTree), }; } // This is what is actually contained in a ComponentDefinition. -type RuntimeComponentDefinition = Exclude, "__args"> & +type RuntimeComponentDefinition = Omit< + ComponentDefinition, + "__args" | "__exports" +> & ComponentDefinitionData & { export: () => ComponentDefinitionAnalysis; }; @@ -310,25 +417,30 @@ type RuntimeAppDefinition = AppDefinition & * @internal */ // eslint-disable-next-line @typescript-eslint/ban-types -export function defineComponent( +export function defineComponent< + Args extends PropertyValidators = EmptyObject, + Exports extends ComponentExports = any, +>( name: string, options: { args?: Args } = {}, -): ComponentDefinition { +): ComponentDefinition { const ret: RuntimeComponentDefinition = { _isRoot: false, _name: name, _args: options.args || {}, _childComponents: [], _httpMounts: {}, + _exportTree: {}, export: exportComponentForAnalysis, install, + mount, mountHttp, // pretend to conform to ComponentDefinition, which temporarily expects __args - ...({} as { __args: any }), + ...({} as { __args: any; __exports: any }), }; - return ret as ComponentDefinition; + return ret as any as ComponentDefinition; } /** @@ -340,9 +452,11 @@ export function defineApp(): AppDefinition { _isRoot: true, _childComponents: [], _httpMounts: {}, + _exportTree: {}, export: exportAppForAnalysis, install, + mount, mountHttp, }; return ret as AppDefinition;