Skip to content

Commit

Permalink
Add OpProvider for sharing op implementations between isolate1 and is…
Browse files Browse the repository at this point in the history
…olate2 (#24639)

Pretty straightforward application of the provider trait approach for syscalls to ops.

GitOrigin-RevId: ee0520c63e52936f946f897e4cd1dfbf7b9e8fff
  • Loading branch information
sujayakar authored and Convex, Inc. committed Apr 14, 2024
1 parent 76f55b7 commit addaff2
Show file tree
Hide file tree
Showing 25 changed files with 1,139 additions and 998 deletions.
117 changes: 16 additions & 101 deletions crates/convex_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use syn::{
FnArg,
GenericArgument,
ItemFn,
Pat,
PathArguments,
PathSegment,
ReturnType,
Expand Down Expand Up @@ -159,7 +160,7 @@ pub fn instrument_future(_attr: TokenStream, item: TokenStream) -> TokenStream {
/// undefined), while both `null` and `undefined` (and missing positional)
/// arguments become None.
///
/// The function should be called as `scope.op_name(args, rt)?`.
/// The function should be called as `op_name(provider, args, rt)?`.
#[proc_macro_attribute]
pub fn v8_op(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ItemFn {
Expand All @@ -186,106 +187,14 @@ pub fn v8_op(_attr: TokenStream, item: TokenStream) -> TokenStream {
..
} = sig;

let Some(FnArg::Receiver(receiver)) = inputs.first() else {
panic!("op should take &mut self");
let Some(FnArg::Typed(first_pat_type)) = inputs.first() else {
panic!("op should take a first argument for its op provider");
};
let arg_parsing: TokenStream2 = inputs
.iter()
.enumerate()
.skip(1)
.map(|(idx, input)| {
let idx = idx as i32;
let FnArg::Typed(pat) = input else {
panic!("input must be typed")
};
let arg_info = format!("{} arg{}", ident, idx);
// NOTE: deno has special case when pat.ty is &mut [u8].
// While that would make some ops more efficient, it also makes them
// unsafe because it's hard to prove that the same buffer isn't
// being mutated from multiple ops in parallel or multiple arguments
// on the same op.
//
// Forego all special casing and just use serde_v8.
quote! {
let #pat = {
let __raw_arg = __args.get(#idx);
::deno_core::serde_v8::from_v8(self, __raw_arg).context(#arg_info)?
};
}
})
.collect();

let ReturnType::Type(_, return_type) = output else {
panic!("op needs return type");
};
let Type::Path(rtype_path) = &**return_type else {
panic!("op must return anyhow::Result<...>")
};
let PathSegment {
ident: retval_type,
arguments: retval_arguments,
} = rtype_path.path.segments.last().unwrap();
assert_eq!(&retval_type.to_string(), "Result");
let PathArguments::AngleBracketed(retval_arguments) = retval_arguments else {
panic!("op must return anyhow::Result<...>")
};
let GenericArgument::Type(_retval_type) = retval_arguments
.args
.last()
.expect("op must return anyhow::Result<...>")
else {
panic!("op must return anyhow::Result<...>");
let Pat::Ident(first_pat_ident) = &*first_pat_type.pat else {
panic!("op's first argument should be a plain identifier");
};
let serialize_retval = quote! {
let __value_v8 = deno_core::serde_v8::to_v8(self, __result_v)?;
__rv.set(__value_v8);
};

let gen = quote! {
#(#attrs)*
#vis fn #ident #generics (
#receiver,
__args: ::deno_core::v8::FunctionCallbackArguments,
mut __rv: ::deno_core::v8::ReturnValue,
) -> ::anyhow::Result<()> {
#arg_parsing
let __result_v = (|| #output { #block })()?;
{ #serialize_retval }
Ok(())
}
};
gen.into()
}

#[proc_macro_attribute]
pub fn v8_op2(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ItemFn {
ref attrs,
ref vis,
ref sig,
ref block,
} = syn::parse(item).unwrap();

assert!(sig.constness.is_none(), "const fn cannot be op");
assert!(sig.asyncness.is_none(), "async fn cannot be op");
assert!(sig.unsafety.is_none(), "unsafe fn cannot be op");
assert!(sig.abi.is_none(), "fn with explicit ABI cannot be op");
assert!(
sig.variadic.is_none(),
"fn with variadic arguments cannot be op"
);
let provider_ident = &first_pat_ident.ident;

let Signature {
ref ident,
ref generics,
ref inputs,
ref output,
..
} = sig;

let Some(FnArg::Receiver(receiver)) = inputs.first() else {
panic!("op should take &mut self");
};
let arg_parsing: TokenStream2 = inputs
.iter()
.enumerate()
Expand All @@ -306,7 +215,10 @@ pub fn v8_op2(_attr: TokenStream, item: TokenStream) -> TokenStream {
quote! {
let #pat = {
let __raw_arg = __args.get(#idx);
::deno_core::serde_v8::from_v8(self.scope, __raw_arg).context(#arg_info)?
::deno_core::serde_v8::from_v8(
OpProvider::scope(#provider_ident),
__raw_arg,
).context(#arg_info)?
};
}
})
Expand Down Expand Up @@ -334,14 +246,17 @@ pub fn v8_op2(_attr: TokenStream, item: TokenStream) -> TokenStream {
panic!("op must return anyhow::Result<...>");
};
let serialize_retval = quote! {
let __value_v8 = deno_core::serde_v8::to_v8(self.scope, __result_v)?;
let __value_v8 = deno_core::serde_v8::to_v8(
OpProvider::scope(#provider_ident),
__result_v,
)?;
__rv.set(__value_v8);
};

let gen = quote! {
#(#attrs)*
#vis fn #ident #generics (
#receiver,
#first_pat_type,
__args: ::deno_core::v8::FunctionCallbackArguments,
mut __rv: ::deno_core::v8::ReturnValue,
) -> ::anyhow::Result<()> {
Expand Down
4 changes: 1 addition & 3 deletions crates/isolate/src/environment/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,6 @@ impl<RT: Runtime> ActionEnvironment<RT> {
}

impl<RT: Runtime> IsolateEnvironment<RT> for ActionEnvironment<RT> {
type Rng = ChaCha12Rng;

fn trace(&mut self, level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> {
// - 1 to reserve for the [ERROR] log line

Expand Down Expand Up @@ -1072,7 +1070,7 @@ impl<RT: Runtime> IsolateEnvironment<RT> for ActionEnvironment<RT> {
Ok(())
}

fn rng(&mut self) -> anyhow::Result<&mut Self::Rng> {
fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> {
self.phase.rng()
}

Expand Down
4 changes: 1 addition & 3 deletions crates/isolate/src/environment/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ pub struct AnalyzeEnvironment {
}

impl<RT: Runtime> IsolateEnvironment<RT> for AnalyzeEnvironment {
type Rng = ChaCha12Rng;

fn trace(&mut self, _level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> {
tracing::warn!(
"Unexpected Console access at import time: {}",
Expand All @@ -146,7 +144,7 @@ impl<RT: Runtime> IsolateEnvironment<RT> for AnalyzeEnvironment {
Ok(())
}

fn rng(&mut self) -> anyhow::Result<&mut Self::Rng> {
fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> {
Ok(&mut self.rng)
}

Expand Down
4 changes: 1 addition & 3 deletions crates/isolate/src/environment/auth_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ pub struct AuthConfig {
}

impl<RT: Runtime> IsolateEnvironment<RT> for AuthConfigEnvironment {
type Rng = ChaCha12Rng;

fn trace(&mut self, _level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> {
tracing::warn!(
"Unexpected Console access when evaluating auth config file: {}",
Expand All @@ -98,7 +96,7 @@ impl<RT: Runtime> IsolateEnvironment<RT> for AuthConfigEnvironment {
Ok(())
}

fn rng(&mut self) -> anyhow::Result<&mut Self::Rng> {
fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> {
anyhow::bail!(ErrorMetadata::bad_request(
"NoRandomDuringAuthConfig",
"Math.random unsupported when evaluating auth config file"
Expand Down
6 changes: 2 additions & 4 deletions crates/isolate/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use model::modules::module_versions::{
ModuleSource,
SourceMap,
};
use rand::Rng;
use rand_chacha::ChaCha12Rng;
use serde_json::Value as JsonValue;
use value::{
TableMapping,
Expand Down Expand Up @@ -62,8 +62,6 @@ use crate::{
/// Both ops and syscalls can return errors tagged with `DeveloperError` to
/// signal a user-visible error that will be turned into a JavaScript exception.
pub trait IsolateEnvironment<RT: Runtime>: 'static {
type Rng: Rng;

#[allow(async_fn_in_trait)]
async fn lookup_source(
&mut self,
Expand All @@ -87,7 +85,7 @@ pub trait IsolateEnvironment<RT: Runtime>: 'static {
messages: Vec<String>,
system_log_metadata: SystemLogMetadata,
) -> anyhow::Result<()>;
fn rng(&mut self) -> anyhow::Result<&mut Self::Rng>;
fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng>;
fn unix_timestamp(&self) -> anyhow::Result<UnixTimestamp>;

fn get_environment_variable(&mut self, name: EnvVarName)
Expand Down
4 changes: 1 addition & 3 deletions crates/isolate/src/environment/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ pub struct SchemaEnvironment {
}

impl<RT: Runtime> IsolateEnvironment<RT> for SchemaEnvironment {
type Rng = ChaCha12Rng;

fn trace(&mut self, _level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> {
tracing::warn!(
"Unexpected Console access at schema evaluation time: {}",
Expand All @@ -95,7 +93,7 @@ impl<RT: Runtime> IsolateEnvironment<RT> for SchemaEnvironment {
Ok(())
}

fn rng(&mut self) -> anyhow::Result<&mut Self::Rng> {
fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> {
Ok(&mut self.rng)
}

Expand Down
4 changes: 1 addition & 3 deletions crates/isolate/src/environment/udf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ pub struct DatabaseUdfEnvironment<RT: Runtime> {
}

impl<RT: Runtime> IsolateEnvironment<RT> for DatabaseUdfEnvironment<RT> {
type Rng = ChaCha12Rng;

fn trace(&mut self, level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> {
// - 1 to reserve for the [ERROR] log line
match self.log_lines.len().cmp(&(MAX_LOG_LINES - 1)) {
Expand Down Expand Up @@ -249,7 +247,7 @@ impl<RT: Runtime> IsolateEnvironment<RT> for DatabaseUdfEnvironment<RT> {
Ok(())
}

fn rng(&mut self) -> anyhow::Result<&mut Self::Rng> {
fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> {
self.phase.rng()
}

Expand Down
79 changes: 0 additions & 79 deletions crates/isolate/src/execution_scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,85 +524,6 @@ impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment<RT>> ExecutionScope<'a, 'b,
Ok(v8::Local::new(&mut scope, handle))
}

pub fn op(
&mut self,
args: v8::FunctionCallbackArguments,
rv: v8::ReturnValue,
) -> anyhow::Result<()> {
let _s = static_span!();
if args.length() < 1 {
// This must be a bug in our `udf-runtime` code, not a developer error.
anyhow::bail!("op(op_name, ...) takes at least one argument");
}
let op_name: v8::Local<v8::String> = args.get(0).try_into()?;
let op_name = to_rust_string(self, &op_name)?;

let timer = metrics::op_timer(&op_name);

match &op_name[..] {
"throwUncatchableDeveloperError" => self.op_throwUncatchableDeveloperError(args, rv)?,
"console/message" => self.op_console_message(args, rv)?,
"console/trace" => self.op_console_trace(args, rv)?,
"console/timeStart" => self.op_console_timeStart(args, rv)?,
"console/timeLog" => self.op_console_timeLog(args, rv)?,
"console/timeEnd" => self.op_console_timeEnd(args, rv)?,
"error/stack" => self.op_error_stack(args, rv)?,
"random" => self.op_random(args, rv)?,
"now" => self.op_now(args, rv)?,
"url/getUrlInfo" => self.op_url_getUrlInfo(args, rv)?,
"url/getUrlSearchParamPairs" => self.op_url_getUrlSearchParamPairs(args, rv)?,
"url/stringifyUrlSearchParams" => self.op_url_stringifyUrlSearchParams(args, rv)?,
"url/updateUrlInfo" => self.op_url_updateUrlInfo(args, rv)?,
"blob/createPart" => self.op_blob_createPart(args, rv)?,
"blob/slicePart" => self.op_blob_slicePart(args, rv)?,
"blob/readPart" => self.op_blob_readPart(args, rv)?,
"stream/create" => self.op_stream_create(args, rv)?,
"stream/extend" => self.op_stream_extend(args, rv)?,
"headers/getMimeType" => self.op_headers_getMimeType(args, rv)?,
"headers/normalizeName" => self.op_headers_normalizeName(args, rv)?,
"textEncoder/encode" => self.op_textEncoder_encode(args, rv)?,
"textEncoder/encodeInto" => self.op_textEncoder_encodeInto(args, rv)?,
"textEncoder/decode" => self.op_textEncoder_decode(args, rv)?,
"textEncoder/normalizeLabel" => self.op_textEncoder_normalizeLabel(args, rv)?,
"atob" => self.op_atob(args, rv)?,
"btoa" => self.op_btoa(args, rv)?,
"environmentVariables/get" => self.op_environmentVariables_get(args, rv)?,
"getTableMappingWithoutSystemTables" => {
self.op_getTableMappingWithoutSystemTables(args, rv)?
},
"validateArgs" => self.op_validate_args(args, rv)?,
"crypto/randomUUID" => self.op_crypto_randomUUID(args, rv)?,
"crypto/getRandomValues" => self.op_crypto_getRandomValues(args, rv)?,
"crypto/sign" => self.op_crypto_sign(args, rv)?,
"crypto/signEd25519" => self.op_crypto_sign_ed25519(args, rv)?,
"crypto/verify" => self.op_crypto_verify(args, rv)?,
"crypto/verifyEd25519" => self.op_crypto_verify_ed25519(args, rv)?,
"crypto/deriveBits" => self.op_crypto_deriveBits(args, rv)?,
"crypto/digest" => self.op_crypto_digest(args, rv)?,
"crypto/importKey" => self.op_crypto_importKey(args, rv)?,
"crypto/importSpkiEd25519" => self.op_crypto_import_spki_ed25519(args, rv)?,
"crypto/importPkcs8Ed25519" => self.op_crypto_import_pkcs8_ed25519(args, rv)?,
"crypto/importSpkiX25519" => self.op_crypto_import_spki_x25519(args, rv)?,
"crypto/importPkcs8X25519" => self.op_crypto_import_pkcs8_x25519(args, rv)?,
"crypto/base64UrlEncode" => self.op_crypto_base64_url_encode(args, rv)?,
"crypto/base64UrlDecode" => self.op_crypto_base64_url_decode(args, rv)?,
"crypto/exportKey" => self.op_crypto_export_key(args, rv)?,
"crypto/exportSpkiEd25519" => self.op_crypto_export_spki_ed25519(args, rv)?,
"crypto/exportPkcs8Ed25519" => self.op_crypto_export_pkcs8_ed25519(args, rv)?,
"crypto/JwkXEd25519" => self.op_crypto_jwk_x_ed25519(args, rv)?,
"crypto/exportSpkiX25519" => self.op_crypto_export_spki_x25519(args, rv)?,
"crypto/exportPkcs8X25519" => self.op_crypto_export_pkcs8_x25519(args, rv)?,
_ => {
anyhow::bail!(ErrorMetadata::bad_request(
"UnknownOperation",
format!("Unknown operation {op_name}")
));
},
};
timer.finish();
Ok(())
}

pub fn async_op(
&mut self,
args: v8::FunctionCallbackArguments,
Expand Down
20 changes: 10 additions & 10 deletions crates/isolate/src/isolate2/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use super::{
session::Session,
FunctionId,
};
use crate::strings;
use crate::{
ops::run_op,
strings,
};

// Each isolate session can have multiple contexts, which we'll eventually use
// for subtransactions. Each context executes with a particular environment,
Expand Down Expand Up @@ -183,15 +186,12 @@ impl Context {
rv: v8::ReturnValue,
) {
let mut ctx = EnteredContext::from_callback(scope);
match ctx.op(args, rv) {
Ok(()) => (),
Err(e) => {
// XXX: Handle syscall or op error.
// let message = strings::syscallError.create(scope).unwrap();
// let exception = v8::Exception::error(scope, message);
// scope.throw_exception(exception);
panic!("Unexpected error: {e:?}");
},
if let Err(e) = run_op(&mut ctx, args, rv) {
// XXX: Handle syscall or op error.
// let message = strings::syscallError.create(scope).unwrap();
// let exception = v8::Exception::error(scope, message);
// scope.throw_exception(exception);
panic!("Unexpected error: {e:?}");
}
}

Expand Down
Loading

0 comments on commit addaff2

Please sign in to comment.