Skip to content

Commit

Permalink
function handles in node actions (#30554)
Browse files Browse the repository at this point in the history
support `createFunctionHandle` syscall and also function handles in `run*` and `scheduler` in node actions.

GitOrigin-RevId: 82091658169f9f702faa50cf75e4b11a231ecc28
  • Loading branch information
ldanilek authored and Convex, Inc. committed Oct 10, 2024
1 parent 7642b5a commit 46af65e
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 10 deletions.
11 changes: 11 additions & 0 deletions crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2100,4 +2100,15 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
let mut tx = self.database.begin(identity).await?;
FunctionHandlesModel::new(&mut tx).lookup(handle).await
}

async fn create_function_handle(
&self,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
) -> anyhow::Result<FunctionHandle> {
let mut tx = self.database.begin(identity).await?;
FunctionHandlesModel::new(&mut tx)
.get_with_component_path(path)
.await
}
}
5 changes: 5 additions & 0 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2817,7 +2817,12 @@ impl<RT: Runtime> Application<RT> {
component_id: ComponentId,
path: Option<String>,
reference: Option<String>,
function_handle: Option<String>,
) -> anyhow::Result<CanonicalizedComponentFunctionPath> {
if let Some(function_handle) = function_handle {
let handle = function_handle.parse()?;
return self.lookup_function_handle(identity, handle).await;
}
let reference = match (path, reference) {
(None, None) => anyhow::bail!(ErrorMetadata::bad_request(
"MissingUdfPathOrFunctionReference",
Expand Down
5 changes: 5 additions & 0 deletions crates/isolate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ pub trait ActionCallbacks: Send + Sync {
identity: Identity,
handle: FunctionHandle,
) -> anyhow::Result<CanonicalizedComponentFunctionPath>;
async fn create_function_handle(
&self,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
) -> anyhow::Result<FunctionHandle>;
}

pub struct UdfRequest<RT: Runtime> {
Expand Down
2 changes: 2 additions & 0 deletions crates/isolate/src/environment/action/async_syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ impl<RT: Runtime> TaskExecutor<RT> {
self.resolve_function(&reference)?
},
};
// TODO(lee) remove preloaded function handles and call action callback instead,
// after the callback is deployed to backend & usher.
let handle = {
let function_handles = self.function_handles.lock();
function_handles.get(&function_path).cloned()
Expand Down
4 changes: 1 addition & 3 deletions crates/isolate/src/environment/udf/async_syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,8 @@ impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
path: CanonicalizedComponentFunctionPath,
) -> anyhow::Result<FunctionHandle> {
let tx = self.phase.tx()?;
let (_, component) =
BootstrapComponentsModel::new(tx).must_component_path_to_ids(&path.component)?;
FunctionHandlesModel::new(tx)
.get(component, path.udf_path)
.get_with_component_path(path)
.await
}

Expand Down
11 changes: 11 additions & 0 deletions crates/isolate/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,17 @@ impl<RT: Runtime, P: Persistence + Clone> ActionCallbacks for UdfTest<RT, P> {
let mut tx = self.database.begin(identity).await?;
FunctionHandlesModel::new(&mut tx).lookup(handle).await
}

async fn create_function_handle(
&self,
identity: Identity,
path: CanonicalizedComponentFunctionPath,
) -> anyhow::Result<FunctionHandle> {
let mut tx = self.database.begin(identity).await?;
FunctionHandlesModel::new(&mut tx)
.get_with_component_path(path)
.await
}
}

/// Create a bogus UDF request for testing. Should only be used for tests
Expand Down
79 changes: 75 additions & 4 deletions crates/local_backend/src/node_action_callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ use crate::{
pub struct NodeCallbackUdfPostRequest {
pub path: Option<String>,
pub reference: Option<String>,
pub function_handle: Option<String>,
pub args: UdfArgsJson,

pub format: Option<String>,
Expand All @@ -111,7 +112,13 @@ pub async fn internal_query_post(
) -> Result<impl IntoResponse, HttpResponseError> {
let path = st
.application
.canonicalized_function_path(identity.clone(), component_id, req.path, req.reference)
.canonicalized_function_path(
identity.clone(),
component_id,
req.path,
req.reference,
req.function_handle,
)
.await?;
let udf_return = st
.application
Expand Down Expand Up @@ -159,7 +166,13 @@ pub async fn internal_mutation_post(
) -> Result<impl IntoResponse, HttpResponseError> {
let path = st
.application
.canonicalized_function_path(identity.clone(), component_id, req.path, req.reference)
.canonicalized_function_path(
identity.clone(),
component_id,
req.path,
req.reference,
req.function_handle,
)
.await?;
let udf_result = st
.application
Expand Down Expand Up @@ -212,7 +225,13 @@ pub async fn internal_action_post(
) -> Result<impl IntoResponse, HttpResponseError> {
let path = st
.application
.canonicalized_function_path(identity.clone(), component_id, req.path, req.reference)
.canonicalized_function_path(
identity.clone(),
component_id,
req.path,
req.reference,
req.function_handle,
)
.await?;
let udf_result = st
.application
Expand Down Expand Up @@ -249,6 +268,7 @@ pub async fn internal_action_post(
#[serde(rename_all = "camelCase")]
pub struct ScheduleJobRequest {
reference: Option<String>,
function_handle: Option<String>,
udf_path: Option<String>,
udf_args: UdfArgsJson,
scheduled_ts: f64,
Expand All @@ -274,7 +294,13 @@ pub async fn schedule_job(
// User might have entered an invalid path, so this is a developer error.
let path = st
.application
.canonicalized_function_path(identity.clone(), component_id, req.udf_path, req.reference)
.canonicalized_function_path(
identity.clone(),
component_id,
req.udf_path,
req.reference,
req.function_handle,
)
.await
.map_err(|e| {
anyhow::anyhow!(ErrorMetadata::bad_request("InvalidUdfPath", e.to_string()))
Expand Down Expand Up @@ -323,6 +349,51 @@ pub async fn cancel_developer_job(
Ok(Json(json!(null)))
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateFunctionHandleRequest {
udf_path: Option<String>,
reference: Option<String>,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateFunctionHandleResponse {
handle: String,
}

#[debug_handler]
pub async fn create_function_handle(
State(st): State<LocalAppState>,
ExtractActionIdentity {
identity,
component_id,
}: ExtractActionIdentity,
Json(req): Json<CreateFunctionHandleRequest>,
) -> Result<impl IntoResponse, HttpResponseError> {
let path = st
.application
.canonicalized_function_path(
identity.clone(),
component_id,
req.udf_path,
req.reference,
None,
)
.await
.map_err(|e| {
anyhow::anyhow!(ErrorMetadata::bad_request("InvalidUdfPath", e.to_string()))
})?;
let handle = st
.application
.runner()
.create_function_handle(identity, path)
.await?;
Ok(Json(CreateFunctionHandleResponse {
handle: String::from(handle),
}))
}

#[debug_handler]
pub async fn vector_search(
State(st): State<LocalAppState>,
Expand Down
2 changes: 2 additions & 0 deletions crates/local_backend/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use crate::{
node_action_callbacks::{
action_callbacks_middleware,
cancel_developer_job,
create_function_handle,
internal_action_post,
internal_mutation_post,
internal_query_post,
Expand Down Expand Up @@ -271,6 +272,7 @@ where
.route("/schedule_job", post(schedule_job))
.route("/vector_search", post(vector_search))
.route("/cancel_job", post(cancel_developer_job))
.route("/create_function_handle", post(create_function_handle))
// file storage endpoints
.route("/storage_generate_upload_url", post(storage_generate_upload_url))
.route("/storage_get_url", post(storage_get_url))
Expand Down
15 changes: 15 additions & 0 deletions crates/model/src/components/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ impl<'a, RT: Runtime> FunctionHandlesModel<'a, RT> {
Ok(handles)
}

pub async fn get_with_component_path(
&mut self,
path: CanonicalizedComponentFunctionPath,
) -> anyhow::Result<FunctionHandle> {
let Some((_, component_id)) =
BootstrapComponentsModel::new(self.tx).component_path_to_ids(&path.component)?
else {
anyhow::bail!(ErrorMetadata::bad_request(
"ComponentNotFound",
"Component not found"
));
};
self.get(component_id, path.udf_path).await
}

pub async fn get(
&mut self,
component: ComponentId,
Expand Down
6 changes: 5 additions & 1 deletion npm-packages/convex/src/server/components/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PropertyValidators, convexToJson } from "../../values/index.js";
import { version } from "../../index.js";
import {
AnyFunctionReference,
FunctionReference,
Expand Down Expand Up @@ -63,7 +64,10 @@ export async function createFunctionHandle<
>,
): Promise<FunctionHandle<Type, Args, ReturnType>> {
const address = getFunctionAddress(functionReference);
return await performAsyncSyscall("1.0/createFunctionHandle", { ...address });
return await performAsyncSyscall("1.0/createFunctionHandle", {
...address,
version,
});
}

interface ComponentExports {
Expand Down
43 changes: 41 additions & 2 deletions npm-packages/node-executor/src/syscalls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const STATUS_CODE_UDF_FAILED = 560;
const runFunctionArgs = z.object({
name: z.optional(z.string()),
reference: z.optional(z.string()),
functionHandle: z.optional(z.string()),
args: z.any(),
version: z.string(),
});
Expand All @@ -35,6 +36,7 @@ const runFunctionReturn = z.union([
const scheduleSchema = z.object({
name: z.optional(z.string()),
reference: z.optional(z.string()),
functionHandle: z.optional(z.string()),
ts: z.number(),
args: z.any(),
version: z.string(),
Expand Down Expand Up @@ -174,10 +176,13 @@ export class SyscallsImpl {
jsonArgs: string,
argValidator: ArgValidator,
operationName: string,
requireRequestId: boolean = true,
): z.infer<ArgValidator> {
const args = JSON.parse(jsonArgs);
// TODO(CX-5733): Rename requestId to lambdaExecuteId in callers and here.
this.validateLambdaExecuteId(args.requestId);
if (requireRequestId) {
// TODO(CX-5733): Rename requestId to lambdaExecuteId in callers and here.
this.validateLambdaExecuteId(args.requestId);
}
delete args.requestId;
try {
const parsedArgs = argValidator.parse(args);
Expand Down Expand Up @@ -308,6 +313,10 @@ export class SyscallsImpl {
return JSON.stringify(await this.syscallStorageGetMetadata(jsonArgs));
case "1.0/storageDelete":
return JSON.stringify(await this.syscallStorageDelete(jsonArgs));
case "1.0/createFunctionHandle":
return JSON.stringify(
await this.syscallCreateFunctionHandle(jsonArgs),
);
default:
throw new Error(`Unknown operation ${op}`);
}
Expand Down Expand Up @@ -361,6 +370,7 @@ export class SyscallsImpl {
body: {
path: queryArgs.name,
reference: queryArgs.reference,
functionHandle: queryArgs.functionHandle,
args: queryArgs.args,
},
path: "/api/actions/query",
Expand Down Expand Up @@ -402,6 +412,7 @@ export class SyscallsImpl {
body: {
path: mutationArgs.name,
reference: mutationArgs.reference,
functionHandle: mutationArgs.functionHandle,
args: mutationArgs.args,
},
path: "/api/actions/mutation",
Expand Down Expand Up @@ -443,6 +454,7 @@ export class SyscallsImpl {
body: {
path: actionArgs.name,
reference: actionArgs.reference,
functionHandle: actionArgs.functionHandle,
args: actionArgs.args,
},
path: "/api/actions/action",
Expand Down Expand Up @@ -503,6 +515,7 @@ export class SyscallsImpl {
version: scheduleArgs.version,
body: {
reference: scheduleArgs.reference,
functionHandle: scheduleArgs.functionHandle,
udfPath: scheduleArgs.name,
udfArgs: scheduleArgs.args,
scheduledTs: scheduleArgs.ts,
Expand Down Expand Up @@ -681,6 +694,32 @@ export class SyscallsImpl {
const getResult = await fetch(getUrl);
return await getResult.blob();
}

async syscallCreateFunctionHandle(rawArgs: string): Promise<JSONValue> {
const createFunctionHandleArgs = z.object({
name: z.optional(z.string()),
reference: z.optional(z.string()),
version: z.string(),
});
const operationName = "create function handle";
const args = this.validateArgs(
rawArgs,
createFunctionHandleArgs,
operationName,
false,
);
const { handle } = await this.actionCallback({
version: args.version,
body: {
udfPath: args.name,
reference: args.reference,
},
path: "/api/actions/create_function_handle",
operationName,
responseValidator: z.any(),
});
return handle;
}
}

function forwardErrorData(errorData: JSONValue, error: ConvexError<string>) {
Expand Down

0 comments on commit 46af65e

Please sign in to comment.