Skip to content

Commit

Permalink
Resubmit export PRs (#28698)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 6f5d9a5e47e4aa4552ccb4fd1a170f43ba118a09
  • Loading branch information
sujayakar authored and Convex, Inc. committed Aug 4, 2024
1 parent a2eef7f commit a01d010
Show file tree
Hide file tree
Showing 16 changed files with 525 additions and 204 deletions.
151 changes: 140 additions & 11 deletions crates/application/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use common::{
components::{
CanonicalizedComponentFunctionPath,
ComponentId,
ComponentPath,
ExportPath,
},
http::ResolvedHost,
pause::PauseClient,
Expand Down Expand Up @@ -96,7 +98,25 @@ pub trait ApplicationApi: Send + Sync {
auth_token: AuthenticationToken,
) -> anyhow::Result<Identity>;

/// Execute a public query on the root app. This method is used by the sync
/// worker and HTTP API for the majority of traffic as the main entry point
/// for queries.
async fn execute_public_query(
&self,
host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: ExportPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
ts: ExecuteQueryTimestamp,
journal: Option<SerializedQueryJournal>,
) -> anyhow::Result<RedactedQueryReturn>;

/// Execute an admin query for a particular component. This method is used
/// by the sync worker for running queries for the dashboard and only works
/// for admin or system identity.
async fn execute_admin_query(
&self,
host: &ResolvedHost,
request_id: RequestId,
Expand All @@ -108,44 +128,54 @@ pub trait ApplicationApi: Send + Sync {
journal: Option<SerializedQueryJournal>,
) -> anyhow::Result<RedactedQueryReturn>;

/// Execute a public mutation on the root app.
async fn execute_public_mutation(
&self,
host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
path: ExportPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
// Identifier used to make this mutation idempotent.
mutation_identifier: Option<SessionRequestIdentifier>,
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>>;

async fn execute_public_action(
/// Execute an admin mutation for a particular component for the dashboard.
async fn execute_admin_mutation(
&self,
host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;
mutation_identifier: Option<SessionRequestIdentifier>,
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>>;

async fn execute_any_function(
/// Execute a public action on the root app.
async fn execute_public_action(
&self,
host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
path: ExportPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<FunctionReturn, FunctionError>>;
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;

async fn latest_timestamp(
/// Execute an admin action for a particular component for the dashboard.
async fn execute_admin_action(
&self,
host: &ResolvedHost,
request_id: RequestId,
) -> anyhow::Result<RepeatableTimestamp>;
identity: Identity,
path: CanonicalizedComponentFunctionPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;

/// Execute an HTTP action on the root app.
async fn execute_http_action(
&self,
host: &ResolvedHost,
Expand All @@ -156,6 +186,25 @@ pub trait ApplicationApi: Send + Sync {
response_streamer: HttpActionResponseStreamer,
) -> anyhow::Result<()>;

/// For the dashboard (and the CLI), run any function in any component
/// without knowing its type. This function requires admin identity for
/// calling functions outside the root component.
async fn execute_any_function(
&self,
host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<FunctionReturn, FunctionError>>;

async fn latest_timestamp(
&self,
host: &ResolvedHost,
request_id: RequestId,
) -> anyhow::Result<RepeatableTimestamp>;

async fn check_store_file_authorization(
&self,
host: &ResolvedHost,
Expand Down Expand Up @@ -226,7 +275,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
_host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
path: ExportPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
ts: ExecuteQueryTimestamp,
Expand All @@ -236,7 +285,34 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
"This method should not be used by internal callers."
);
let ts = match ts {
ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(),
ExecuteQueryTimestamp::At(ts) => ts,
};
// Public queries always start with the root component.
let path = CanonicalizedComponentFunctionPath {
component: ComponentPath::root(),
udf_path: path.into(),
};
self.read_only_udf_at_ts(request_id, path, args, identity, ts, journal, caller)
.await
}

async fn execute_admin_query(
&self,
_host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
ts: ExecuteQueryTimestamp,
journal: Option<SerializedQueryJournal>,
) -> anyhow::Result<RedactedQueryReturn> {
anyhow::ensure!(
path.component.is_root() || identity.is_admin() || identity.is_system(),
"Only admin or system users can call functions on non-root components directly"
);
let ts = match ts {
ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(),
ExecuteQueryTimestamp::At(ts) => ts,
Expand All @@ -250,7 +326,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
_host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
path: ExportPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
// Identifier used to make this mutation idempotent.
Expand All @@ -260,7 +336,36 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
"This method should not be used by internal callers."
);
let path = CanonicalizedComponentFunctionPath {
component: ComponentPath::root(),
udf_path: path.into(),
};
self.mutation_udf(
request_id,
path,
args,
identity,
mutation_identifier,
caller,
PauseClient::new(),
)
.await
}

async fn execute_admin_mutation(
&self,
_host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
mutation_identifier: Option<SessionRequestIdentifier>,
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> {
anyhow::ensure!(
path.component.is_root() || identity.is_admin() || identity.is_system(),
"Only admin or system users can call functions on non-root components directly"
);
self.mutation_udf(
request_id,
path,
Expand All @@ -278,15 +383,35 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
_host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
path: ExportPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> {
anyhow::ensure!(
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
"This method should not be used by internal callers."
);
let path = CanonicalizedComponentFunctionPath {
component: ComponentPath::root(),
udf_path: path.into(),
};
self.action_udf(request_id, path, args, identity, caller)
.await
}

async fn execute_admin_action(
&self,
_host: &ResolvedHost,
request_id: RequestId,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> {
anyhow::ensure!(
path.component.is_root() || identity.is_admin() || identity.is_system(),
"Only admin or system users can call functions on non-root components directly"
);
self.action_udf(request_id, path, args, identity, caller)
.await
}
Expand All @@ -300,6 +425,10 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
args: Vec<JsonValue>,
caller: FunctionCaller,
) -> anyhow::Result<Result<FunctionReturn, FunctionError>> {
anyhow::ensure!(
path.component.is_root() || identity.is_admin() || identity.is_system(),
"Only admin or system users can call functions on non-root components directly"
);
self.any_udf(request_id, path, args, identity, caller).await
}

Expand Down
6 changes: 3 additions & 3 deletions crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) {
anyhow::bail!(unauthorized_error("mutation"));
}
let arguments = match parse_udf_args(&path, arguments) {
let arguments = match parse_udf_args(&path.udf_path, arguments) {
Ok(arguments) => arguments,
Err(error) => {
return Ok(Err(MutationError {
Expand Down Expand Up @@ -1032,7 +1032,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) {
anyhow::bail!(unauthorized_error("action"));
}
let arguments = match parse_udf_args(&path, arguments) {
let arguments = match parse_udf_args(&path.udf_path, arguments) {
Ok(arguments) => arguments,
Err(error) => {
return Ok(Err(ActionError {
Expand Down Expand Up @@ -1673,7 +1673,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) {
anyhow::bail!(unauthorized_error("query"));
}
let args = match parse_udf_args(&path, args) {
let args = match parse_udf_args(&path.udf_path, args) {
Ok(arguments) => arguments,
Err(js_error) => {
return Ok(QueryReturn {
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,7 @@ impl<RT: Runtime> Application<RT> {
component: ComponentPath::TODO(),
udf_path: CanonicalizedUdfPath::new(module_path, function_name),
};
let arguments = parse_udf_args(&path, args)?;
let arguments = parse_udf_args(&path.udf_path, args)?;
let (result, log_lines) = match analyzed_function.udf_type {
UdfType::Query => self
.runner
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/tests/cron_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn create_cron_job(
};
let cron_spec = CronSpec {
udf_path: path.udf_path.clone(),
udf_args: parse_udf_args(&path, vec![JsonValue::Object(map)])?,
udf_args: parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?,
cron_schedule: CronSchedule::Interval { seconds: 60 },
};
let original_jobs = cron_model.list().await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/tests/scheduled_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn create_scheduled_job<'a>(
let job_id = model
.schedule(
path.clone(),
parse_udf_args(&path, vec![JsonValue::Object(map)])?,
parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?,
rt.unix_timestamp(),
ExecutionContext::new_for_test(),
)
Expand Down
34 changes: 34 additions & 0 deletions crates/common/src/components/function_paths.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::str::FromStr;

use serde::{
Deserialize,
Serialize,
Expand Down Expand Up @@ -113,3 +115,35 @@ impl HeapSize for CanonicalizedComponentFunctionPath {
self.component.heap_size() + self.udf_path.heap_size()
}
}

pub struct ExportPath {
path: CanonicalizedUdfPath,
}

impl From<CanonicalizedUdfPath> for ExportPath {
fn from(path: CanonicalizedUdfPath) -> Self {
Self { path }
}
}

impl From<ExportPath> for CanonicalizedUdfPath {
fn from(p: ExportPath) -> Self {
p.path
}
}

impl FromStr for ExportPath {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self {
path: CanonicalizedUdfPath::from_str(s)?,
})
}
}

impl From<ExportPath> for String {
fn from(p: ExportPath) -> Self {
p.path.to_string()
}
}
1 change: 1 addition & 0 deletions crates/common/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use self::{
CanonicalizedComponentFunctionPath,
ComponentDefinitionFunctionPath,
ComponentFunctionPath,
ExportPath,
},
module_paths::CanonicalizedComponentModulePath,
reference::Reference,
Expand Down
4 changes: 2 additions & 2 deletions crates/isolate/src/environment/helpers/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub async fn validate_schedule_args<RT: Runtime>(
}

// We do serialize the arguments, so this is likely our fault.
let udf_args = parse_udf_args(&path, udf_args)?;
let udf_args = parse_udf_args(&path.udf_path, udf_args)?;

// Even though we might use different version of modules when executing,
// we do validate that the scheduled function exists at time of scheduling.
Expand Down Expand Up @@ -435,7 +435,7 @@ impl ValidatedPathAndArgs {
))));
}

match validate_udf_args_size(&path, &args) {
match validate_udf_args_size(&path.udf_path, &args) {
Ok(()) => (),
Err(err) => return Ok(Err(err)),
}
Expand Down
Loading

0 comments on commit a01d010

Please sign in to comment.