From 4c52b745a5a608e5fa44e5d347c03204341aa852 Mon Sep 17 00:00:00 2001 From: Sujay Jayakar Date: Wed, 17 Apr 2024 18:58:08 -0400 Subject: [PATCH] Wire up queries to isolate2 (#24784) GitOrigin-RevId: 10d1b67cdd8ff9ba38822d8563ae17beb0af1382 --- .../src/environment/udf/async_syscall.rs | 59 ++- crates/isolate/src/environment/udf/syscall.rs | 65 +-- .../isolate/src/isolate2/callback_context.rs | 444 ++++++++++++++++++ crates/isolate/src/isolate2/client.rs | 15 +- crates/isolate/src/isolate2/context.rs | 75 +-- .../isolate/src/isolate2/entered_context.rs | 346 +------------- crates/isolate/src/isolate2/mod.rs | 1 + crates/isolate/src/isolate2/runner.rs | 259 ++++++++-- crates/isolate/src/isolate2/session.rs | 41 +- crates/isolate/src/tests/basic.rs | 12 +- 10 files changed, 776 insertions(+), 541 deletions(-) create mode 100644 crates/isolate/src/isolate2/callback_context.rs diff --git a/crates/isolate/src/environment/udf/async_syscall.rs b/crates/isolate/src/environment/udf/async_syscall.rs index 930e5fc7..89037b96 100644 --- a/crates/isolate/src/environment/udf/async_syscall.rs +++ b/crates/isolate/src/environment/udf/async_syscall.rs @@ -23,6 +23,7 @@ use common::{ }, types::PersistenceVersion, value::ConvexValue, + version::Version, }; use database::{ query::{ @@ -76,6 +77,7 @@ use crate::{ ArgName, }, helpers::UdfArgsJson, + isolate2::client::QueryId, metrics::async_syscall_timer, }; @@ -214,6 +216,14 @@ impl QueryManager { } } +pub enum ManagedQuery { + Pending { + query: Query, + version: Option, + }, + Active(DeveloperQuery), +} + // Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2. #[allow(async_fn_in_trait)] pub trait AsyncSyscallProvider { @@ -229,7 +239,9 @@ pub trait AsyncSyscallProvider { fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool); - fn query_manager(&mut self) -> &mut QueryManager; + fn take_query(&mut self, query_id: QueryId) -> Option>; + fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery); + fn cleanup_query(&mut self, query_id: QueryId) -> bool; fn prev_journal(&mut self) -> &mut QueryJournal; fn next_journal(&mut self) -> &mut QueryJournal; @@ -291,8 +303,18 @@ impl AsyncSyscallProvider for DatabaseUdfEnvironment { .log_async_syscall(name, duration, is_success); } - fn query_manager(&mut self) -> &mut QueryManager { - &mut self.query_manager + fn take_query(&mut self, query_id: QueryId) -> Option> { + self.query_manager + .take_developer(query_id) + .map(ManagedQuery::Active) + } + + fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery) { + self.query_manager.insert_developer(query_id, query); + } + + fn cleanup_query(&mut self, query_id: QueryId) -> bool { + self.query_manager.cleanup_developer(query_id) } fn prev_journal(&mut self) -> &mut QueryJournal { @@ -707,13 +729,24 @@ impl> DatabaseSyscallsV1 { let args: QueryStreamNextArgs = serde_json::from_value(args)?; Ok(args.query_id) })?; - let local_query = provider - .query_manager() - .take_developer(query_id) - .context(ErrorMetadata::not_found( - "QueryNotFound", - "in-progress query not found", - ))?; + let managed_query = + provider + .take_query(query_id) + .context(ErrorMetadata::not_found( + "QueryNotFound", + "in-progress query not found", + ))?; + let local_query = match managed_query { + ManagedQuery::Pending { query, version } => { + DeveloperQuery::new_with_version( + provider.tx()?, + query, + version, + table_filter, + )? + }, + ManagedQuery::Active(local_query) => local_query, + }; Some((Some(query_id), local_query)) }, AsyncRead::Get(args) => { @@ -790,9 +823,7 @@ impl> DatabaseSyscallsV1 { for (batch_key, (query_id, local_query)) in queries_to_fetch { let result: anyhow::Result<_> = try { if let Some(query_id) = query_id { - provider - .query_manager() - .insert_developer(query_id, local_query); + provider.insert_query(query_id, local_query); } let maybe_next = fetch_results .remove(&batch_key) @@ -806,7 +837,7 @@ impl> DatabaseSyscallsV1 { if let Some(query_id) = query_id { if done { - provider.query_manager().cleanup_developer(query_id); + provider.cleanup_query(query_id); } serde_json::to_value(QueryStreamNextResult { value: value.into(), diff --git a/crates/isolate/src/environment/udf/syscall.rs b/crates/isolate/src/environment/udf/syscall.rs index 9f44d7b2..0dca879c 100644 --- a/crates/isolate/src/environment/udf/syscall.rs +++ b/crates/isolate/src/environment/udf/syscall.rs @@ -5,11 +5,11 @@ use common::{ query::Query, runtime::Runtime, static_span, + version::Version, }; use database::{ query::TableFilter, DeveloperQuery, - Transaction, }; use errors::ErrorMetadata; use serde::{ @@ -23,13 +23,12 @@ use serde_json::{ use value::{ id_v6::DocumentIdV6, InternalId, + TableIdAndTableNumber, TableName, + TableNumber, }; -use super::{ - async_syscall::QueryManager, - DatabaseUdfEnvironment, -}; +use super::DatabaseUdfEnvironment; use crate::environment::helpers::{ parse_version, with_argument_error, @@ -38,8 +37,12 @@ use crate::environment::helpers::{ pub trait SyscallProvider { fn table_filter(&self) -> TableFilter; - fn query_manager(&mut self) -> &mut QueryManager; - fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata>; + + fn lookup_table(&mut self, name: &TableName) -> anyhow::Result>; + fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result>; + + fn start_query(&mut self, query: Query, version: Option) -> anyhow::Result; + fn cleanup_query(&mut self, query_id: u32) -> bool; } impl SyscallProvider for DatabaseUdfEnvironment { @@ -51,12 +54,29 @@ impl SyscallProvider for DatabaseUdfEnvironment { } } - fn query_manager(&mut self) -> &mut QueryManager { - &mut self.query_manager + fn lookup_table(&mut self, name: &TableName) -> anyhow::Result> { + let table_mapping = self.phase.tx()?.table_mapping(); + Ok(table_mapping.id_and_number_if_exists(name)) + } + + fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result> { + let virtual_table_mapping = self.phase.tx()?.virtual_table_mapping(); + Ok(virtual_table_mapping.number_if_exists(name)) + } + + fn start_query(&mut self, query: Query, version: Option) -> anyhow::Result { + let table_filter = self.table_filter(); + let tx = self.phase.tx()?; + // TODO: Are all invalid query pipelines developer errors? These could be bugs + // in convex/server. + let compiled_query = + { DeveloperQuery::new_with_version(tx, query, version, table_filter)? }; + let query_id = self.query_manager.put_developer(compiled_query); + Ok(query_id) } - fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata> { - self.phase.tx() + fn cleanup_query(&mut self, query_id: u32) -> bool { + self.query_manager.cleanup_developer(query_id) } } @@ -111,18 +131,11 @@ fn syscall_normalize_id>( let table_name: TableName = args.table.parse().context(ArgName("table"))?; Ok((table_name, args.id_string)) })?; - let virtual_table_number = provider - .tx()? - .virtual_table_mapping() - .number_if_exists(&table_name); + let virtual_table_number = provider.lookup_virtual_table(&table_name)?; let table_number = match virtual_table_number { Some(table_number) => Some(table_number), None => { - let physical_table_number = provider - .tx()? - .table_mapping() - .id_and_number_if_exists(&table_name) - .map(|t| t.table_number); + let physical_table_number = provider.lookup_table(&table_name)?.map(|t| t.table_number); match provider.table_filter() { TableFilter::IncludePrivateSystemTables => physical_table_number, TableFilter::ExcludePrivateSystemTables if table_name.is_system() => None, @@ -155,9 +168,7 @@ fn syscall_query_stream>( provider: &mut P, args: JsonValue, ) -> anyhow::Result { - let _s: common::tracing::NoopSpan = static_span!(); - let table_filter = provider.table_filter(); - let tx = provider.tx()?; + let _s = static_span!(); #[derive(Deserialize)] struct QueryStreamArgs { @@ -170,11 +181,7 @@ fn syscall_query_stream>( let version = parse_version(args.version)?; Ok((parsed_query, version)) })?; - // TODO: Are all invalid query pipelines developer errors? These could be bugs - // in convex/server. - let compiled_query = - { DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? }; - let query_id = provider.query_manager().put_developer(compiled_query); + let query_id = provider.start_query(parsed_query, version)?; #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -197,6 +204,6 @@ fn syscall_query_cleanup>( } let args: QueryCleanupArgs = with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?; - let cleaned_up = provider.query_manager().cleanup_developer(args.query_id); + let cleaned_up = provider.cleanup_query(args.query_id); Ok(serde_json::to_value(cleaned_up)?) } diff --git a/crates/isolate/src/isolate2/callback_context.rs b/crates/isolate/src/isolate2/callback_context.rs new file mode 100644 index 00000000..f1abec92 --- /dev/null +++ b/crates/isolate/src/isolate2/callback_context.rs @@ -0,0 +1,444 @@ +use anyhow::anyhow; +use deno_core::v8; +use errors::{ + ErrorMetadata, + ErrorMetadataAnyhowExt, +}; +use serde_json::Value as JsonValue; + +use super::{ + client::PendingAsyncSyscall, + context_state::ContextState, +}; +use crate::{ + environment::UncatchableDeveloperError, + helpers::{ + self, + to_rust_string, + }, + ops::run_op, +}; + +pub struct CallbackContext<'callback, 'scope: 'callback> { + pub scope: &'callback mut v8::HandleScope<'scope>, + context: v8::Local<'scope, v8::Context>, +} + +impl<'callback, 'scope> CallbackContext<'callback, 'scope> { + fn new(scope: &'callback mut v8::HandleScope<'scope>) -> Self { + let context = scope.get_current_context(); + Self { scope, context } + } + + pub fn context_state(&mut self) -> anyhow::Result<&mut ContextState> { + self.context + .get_slot_mut::(self.scope) + .ok_or_else(|| anyhow::anyhow!("ContextState not found in context")) + } + + pub fn syscall( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue, + ) { + let mut ctx = CallbackContext::new(scope); + match ctx.syscall_impl(args) { + Ok(v) => rv.set(v), + Err(e) => ctx.handle_syscall_or_op_error(e), + } + } + + fn syscall_impl( + &mut self, + args: v8::FunctionCallbackArguments, + ) -> anyhow::Result> { + if args.length() != 2 { + // There's not really an expected developer mistake that would lead to them + // calling Convex.syscall incorrectly -- the bug must be in our + // convex/server code. Treat this as a system error. + anyhow::bail!("syscall(name, arg_object) takes two arguments"); + } + let name: v8::Local = args.get(0).try_into()?; + let name = to_rust_string(self.scope, &name)?; + + let args_v8: v8::Local = args.get(1).try_into()?; + let args_s = to_rust_string(self.scope, &args_v8)?; + let args_v: JsonValue = serde_json::from_str(&args_s).map_err(|e| { + anyhow::anyhow!(ErrorMetadata::bad_request( + "SyscallArgsInvalidJson", + format!("Received invalid json: {e}"), + )) + })?; + + let result = self.context_state()?.environment.syscall(&name, args_v)?; + + let value_s = serde_json::to_string(&result)?; + let value_v8 = v8::String::new(self.scope, &value_s[..]) + .ok_or_else(|| anyhow!("Failed to create result string"))?; + + Ok(value_v8.into()) + } + + pub fn async_syscall( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue, + ) { + let mut ctx = CallbackContext::new(scope); + match ctx.start_async_syscall_impl(args) { + Ok(p) => rv.set(p.into()), + Err(e) => ctx.handle_syscall_or_op_error(e), + } + } + + fn start_async_syscall_impl( + &mut self, + args: v8::FunctionCallbackArguments, + ) -> anyhow::Result> { + if args.length() != 2 { + // There's not really an expected developer mistake that would lead to them + // calling Convex.asyncSyscall incorrectly -- the bug must be in our + // convex/server code. Treat this as a system error. + anyhow::bail!("asyncSyscall(name, arg_object) takes two arguments"); + } + let name: v8::Local = args.get(0).try_into()?; + let name = to_rust_string(self.scope, &name)?; + + let args_v8: v8::Local = args.get(1).try_into()?; + let args_s = to_rust_string(self.scope, &args_v8)?; + let args_v: JsonValue = serde_json::from_str(&args_s).map_err(|e| { + anyhow::anyhow!(ErrorMetadata::bad_request( + "SyscallArgsInvalidJson", + format!("Received invalid json: {e}"), + )) + })?; + + let promise_resolver = v8::PromiseResolver::new(self.scope) + .ok_or_else(|| anyhow::anyhow!("Failed to create v8::PromiseResolver"))?; + + let promise = promise_resolver.get_promise(self.scope); + let resolver = v8::Global::new(self.scope, promise_resolver); + { + let context_state = self.context_state()?; + + let promise_id = context_state.next_promise_id; + context_state.next_promise_id += 1; + + let pending_async_syscall = PendingAsyncSyscall { + promise_id, + name, + args: args_v, + }; + context_state + .pending_async_syscalls + .push(pending_async_syscall); + + context_state.promise_resolvers.insert(promise_id, resolver); + }; + Ok(promise) + } + + pub fn op( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + rv: v8::ReturnValue, + ) { + let mut ctx = CallbackContext::new(scope); + if let Err(e) = run_op(&mut ctx, args, rv) { + ctx.handle_syscall_or_op_error(e); + } + } + + pub extern "C" fn promise_reject_callback(message: v8::PromiseRejectMessage) { + let mut scope = unsafe { v8::CallbackScope::new(&message) }; + + // NB: If we didn't `Context::enter` above in the stack, it's possible + // that our scope will be attached to the default context at the top of the + // stack, which then won't have the `RequestState` slot. This will then cause + // the call into `ctx.push_unhandled_promise_rejection` to fail with a system + // error, which we'll just trace out here. + let mut ctx = CallbackContext::new(&mut scope); + + if let Err(e) = ctx.push_unhandled_promise_rejection(message) { + tracing::error!("Error in promise_reject_callback: {:?}", e); + } + } + + fn push_unhandled_promise_rejection( + &mut self, + message: v8::PromiseRejectMessage, + ) -> anyhow::Result<()> { + match message.get_event() { + v8::PromiseRejectEvent::PromiseRejectWithNoHandler => { + // See comment on PendingUnhandledPromiseRejections. + // A promise rejection is necessary but not sufficient for an + // 'unhandledRejection' event, which throws in our runtime. + // Save the promise and check back in on it once the microtask + // queue has drained. If it remains unhandled then, throw. + let Some(e) = message.get_value() else { + tracing::warn!("Message missing from call to promise_reject_callback"); + return Ok(()); + }; + let error_global = v8::Global::new(self.scope, e); + let promise_global = v8::Global::new(self.scope, message.get_promise()); + self.context_state()? + .unhandled_promise_rejections + .insert(promise_global, error_global); + }, + v8::PromiseRejectEvent::PromiseHandlerAddedAfterReject => { + tracing::warn!("Promise handler added after reject"); + // If this promise was previously a candidate for an + // 'unhandledRejection' event, disqualify it by removing it + // from `pending_unhandled_promise_rejections`. + let promise_global = v8::Global::new(self.scope, message.get_promise()); + self.context_state()? + .unhandled_promise_rejections + .remove(&promise_global); + // log_promise_handler_added_after_reject(); + }, + v8::PromiseRejectEvent::PromiseRejectAfterResolved => { + tracing::warn!("Promise rejected after resolved"); + }, + v8::PromiseRejectEvent::PromiseResolveAfterResolved => { + tracing::warn!("Promise resolved after resolved"); + }, + } + Ok(()) + } + + pub fn resolve_module( + context: v8::Local<'callback, v8::Context>, + specifier: v8::Local<'callback, v8::String>, + _import_assertions: v8::Local<'callback, v8::FixedArray>, + referrer: v8::Local<'callback, v8::Module>, + ) -> Option> { + let mut scope = unsafe { v8::CallbackScope::new(context) }; + let mut ctx = CallbackContext::new(&mut scope); + ctx.resolve_module_impl(specifier, referrer) + } + + fn resolve_module_impl( + &mut self, + specifier: v8::Local<'scope, v8::String>, + referrer: v8::Local<'scope, v8::Module>, + ) -> Option> { + let r: anyhow::Result<_> = try { + let referrer_global = v8::Global::new(self.scope, referrer); + let specifier_str = helpers::to_rust_string(self.scope, &specifier)?; + let context_state = self.context_state()?; + let referrer_name = context_state + .module_map + .lookup_by_v8_module(&referrer_global) + .ok_or_else(|| anyhow!("Module not registered"))? + .to_string(); + let resolved_specifier = deno_core::resolve_import(&specifier_str, &referrer_name)?; + let module = context_state + .module_map + .lookup_module(&resolved_specifier) + .ok_or_else(|| anyhow!("Couldn't find {resolved_specifier}"))? + .clone(); + v8::Local::new(self.scope, module) + }; + match r { + Ok(m) => Some(m), + Err(e) => { + // XXX: This should be a system error! + helpers::throw_type_error(self.scope, format!("{:?}", e)); + None + }, + } + } + + pub fn dynamic_import_callback<'a>( + scope: &mut v8::HandleScope<'a>, + _host_defined_options: v8::Local<'a, v8::Data>, + resource_name: v8::Local<'a, v8::Value>, + specifier: v8::Local<'a, v8::String>, + _import_assertions: v8::Local<'a, v8::FixedArray>, + ) -> Option> { + let mut ctx = CallbackContext::new(scope); + match ctx.start_dynamic_import(resource_name, specifier) { + Ok(promise) => Some(promise), + Err(e) => { + // XXX: distinguish between system and user errors here. + helpers::throw_type_error(scope, format!("{:?}", e)); + None + }, + } + } + + pub fn start_dynamic_import( + &mut self, + resource_name: v8::Local<'scope, v8::Value>, + specifier: v8::Local<'scope, v8::String>, + ) -> anyhow::Result> { + let promise_resolver = v8::PromiseResolver::new(self.scope) + .ok_or_else(|| anyhow::anyhow!("Failed to create v8::PromiseResolver"))?; + + let promise = promise_resolver.get_promise(self.scope); + let resolver = v8::Global::new(self.scope, promise_resolver); + + let resource_name: v8::Local = resource_name.try_into()?; + let referrer_name = helpers::to_rust_string(self.scope, &resource_name)?; + let specifier_str = helpers::to_rust_string(self.scope, &specifier)?; + + let resolved_specifier = deno_core::resolve_import(&specifier_str, &referrer_name) + .map_err(|e| ErrorMetadata::bad_request("InvalidImport", e.to_string()))?; + + self.context_state()? + .pending_dynamic_imports + .push((resolved_specifier, resolver)); + + Ok(promise) + } + + fn handle_syscall_or_op_error(&mut self, err: anyhow::Error) { + if let Some(uncatchable_error) = err.downcast_ref::() { + // TODO: Terminate the isolate. + let message = uncatchable_error.js_error.message.to_string(); + let message_v8 = v8::String::new(self.scope, &message[..]).unwrap(); + let exception = v8::Exception::error(self.scope, message_v8); + self.scope.throw_exception(exception); + return; + } + + if err.is_deterministic_user_error() { + let message = err.user_facing_message(); + let message_v8 = v8::String::new(self.scope, &message[..]).unwrap(); + let exception = v8::Exception::error(self.scope, message_v8); + self.scope.throw_exception(exception); + return; + } + + // TODO: Handle system errors. + todo!(); + } +} + +mod op_provider { + use std::collections::BTreeMap; + + use bytes::Bytes; + use common::{ + log_lines::LogLevel, + runtime::UnixTimestamp, + types::{ + EnvVarName, + EnvVarValue, + }, + }; + use deno_core::{ + v8, + ModuleSpecifier, + }; + use rand_chacha::ChaCha12Rng; + use sourcemap::SourceMap; + use uuid::Uuid; + use value::{ + heap_size::WithHeapSize, + TableMapping, + TableMappingValue, + VirtualTableMapping, + }; + + use super::CallbackContext; + use crate::{ + environment::AsyncOpRequest, + ops::OpProvider, + request_scope::StreamListener, + }; + + impl<'callback, 'scope: 'callback> OpProvider<'scope> for CallbackContext<'callback, 'scope> { + fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> { + let state = self.context_state()?; + state.environment.rng() + } + + fn scope(&mut self) -> &mut v8::HandleScope<'scope> { + self.scope + } + + fn lookup_source_map( + &mut self, + specifier: &ModuleSpecifier, + ) -> anyhow::Result> { + let context_state = self.context_state()?; + let Some(source_map) = context_state.module_map.lookup_source_map(specifier) else { + return Ok(None); + }; + Ok(Some(SourceMap::from_slice(source_map.as_bytes())?)) + } + + fn trace(&mut self, level: LogLevel, messages: Vec) -> anyhow::Result<()> { + self.context_state()?.environment.trace(level, messages) + } + + fn console_timers( + &mut self, + ) -> anyhow::Result<&mut WithHeapSize>> { + todo!() + } + + fn unix_timestamp(&mut self) -> anyhow::Result { + self.context_state()?.environment.unix_timestamp() + } + + fn unix_timestamp_non_deterministic(&mut self) -> anyhow::Result { + todo!() + } + + fn start_async_op( + &mut self, + _request: AsyncOpRequest, + _resolver: v8::Global, + ) -> anyhow::Result<()> { + todo!(); + } + + fn create_blob_part(&mut self, _bytes: Bytes) -> anyhow::Result { + todo!() + } + + fn get_blob_part(&mut self, _uuid: &Uuid) -> anyhow::Result> { + todo!() + } + + fn create_stream(&mut self) -> anyhow::Result { + todo!() + } + + fn extend_stream( + &mut self, + _id: Uuid, + _bytes: Option, + _new_done: bool, + ) -> anyhow::Result<()> { + todo!() + } + + fn new_stream_listener( + &mut self, + _stream_id: Uuid, + _listener: StreamListener, + ) -> anyhow::Result<()> { + todo!(); + } + + fn get_environment_variable( + &mut self, + _name: EnvVarName, + ) -> anyhow::Result> { + todo!() + } + + fn get_all_table_mappings( + &mut self, + ) -> anyhow::Result<(TableMapping, VirtualTableMapping)> { + todo!() + } + + fn get_table_mapping_without_system_tables(&mut self) -> anyhow::Result { + todo!() + } + } +} diff --git a/crates/isolate/src/isolate2/client.rs b/crates/isolate/src/isolate2/client.rs index ffb92944..568b3fbd 100644 --- a/crates/isolate/src/isolate2/client.rs +++ b/crates/isolate/src/isolate2/client.rs @@ -55,18 +55,23 @@ pub enum IsolateThreadRequest { #[derive(Debug)] pub enum EvaluateResult { - Ready(ReadyEvaluateResult), - Pending { - async_syscalls: Vec, - }, + Ready(EvaluateReady), + Pending(EvaluatePending), } #[derive(Debug)] -pub struct ReadyEvaluateResult { +pub struct EvaluateReady { pub result: ConvexValue, pub outcome: EnvironmentOutcome, } +#[derive(Debug)] +pub struct EvaluatePending { + pub async_syscalls: Vec, +} + +pub type QueryId = u32; + #[derive(Debug)] pub struct PendingAsyncSyscall { pub promise_id: PromiseId, diff --git a/crates/isolate/src/isolate2/context.rs b/crates/isolate/src/isolate2/context.rs index 3bff89e0..976906f9 100644 --- a/crates/isolate/src/isolate2/context.rs +++ b/crates/isolate/src/isolate2/context.rs @@ -11,6 +11,7 @@ use deno_core::{ use value::ConvexObject; use super::{ + callback_context::CallbackContext, client::{ AsyncSyscallCompletion, EvaluateResult, @@ -21,10 +22,7 @@ use super::{ session::Session, FunctionId, }; -use crate::{ - ops::run_op, - strings, -}; +use crate::strings; // Each isolate session can have multiple contexts, which we'll eventually use // for subtransactions. Each context executes with a particular environment, @@ -49,14 +47,15 @@ impl Context { let convex_value = v8::Object::new(&mut scope); - let syscall_template = v8::FunctionTemplate::new(&mut scope, Self::syscall); + let syscall_template = v8::FunctionTemplate::new(&mut scope, CallbackContext::syscall); let syscall_value = syscall_template .get_function(&mut scope) .ok_or_else(|| anyhow!("Failed to retrieve function from FunctionTemplate"))?; let syscall_key = strings::syscall.create(&mut scope)?; convex_value.set(&mut scope, syscall_key.into(), syscall_value.into()); - let async_syscall_template = v8::FunctionTemplate::new(&mut scope, Self::async_syscall); + let async_syscall_template = + v8::FunctionTemplate::new(&mut scope, CallbackContext::async_syscall); let async_syscall_value = async_syscall_template .get_function(&mut scope) .ok_or_else(|| anyhow!("Failed to retrieve function from FunctionTemplate"))?; @@ -67,7 +66,7 @@ impl Context { async_syscall_value.into(), ); - let op_template = v8::FunctionTemplate::new(&mut scope, Self::op); + let op_template = v8::FunctionTemplate::new(&mut scope, CallbackContext::op); let op_value = op_template .get_function(&mut scope) .ok_or_else(|| anyhow!("Failed to retrieve function from FunctionTemplate"))?; @@ -134,66 +133,4 @@ impl Context { } Ok(result) } - - pub fn syscall( - scope: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - mut rv: v8::ReturnValue, - ) { - let mut ctx = EnteredContext::from_callback(scope); - match ctx.syscall(args) { - Ok(v) => rv.set(v), - Err(_e) => { - // XXX: Handle syscall or op error. - // let message = strings::syscallError.create(scope).unwrap(); - // let exception = v8::Exception::error(scope, message); - // scope.throw_exception(exception); - todo!(); - }, - } - } - - pub fn async_syscall( - scope: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - mut rv: v8::ReturnValue, - ) { - let mut ctx = EnteredContext::from_callback(scope); - match ctx.start_async_syscall(args) { - Ok(p) => rv.set(p.into()), - Err(_e) => { - // XXX: Handle syscall or op error. - // let message = strings::syscallError.create(scope).unwrap(); - // let exception = v8::Exception::error(scope, message); - // scope.throw_exception(exception); - todo!(); - }, - } - } - - pub fn op( - scope: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - rv: v8::ReturnValue, - ) { - let mut ctx = EnteredContext::from_callback(scope); - if let Err(e) = run_op(&mut ctx, args, rv) { - // XXX: Handle syscall or op error. - // let message = strings::syscallError.create(scope).unwrap(); - // let exception = v8::Exception::error(scope, message); - // scope.throw_exception(exception); - panic!("Unexpected error: {e:?}"); - } - } - - pub fn module_resolve_callback<'callback>( - context: v8::Local<'callback, v8::Context>, - specifier: v8::Local<'callback, v8::String>, - _import_assertions: v8::Local<'callback, v8::FixedArray>, - referrer: v8::Local<'callback, v8::Module>, - ) -> Option> { - let mut scope = unsafe { v8::CallbackScope::new(context) }; - let mut ctx = EnteredContext::from_callback(&mut scope); - ctx.resolve_module(specifier, referrer) - } } diff --git a/crates/isolate/src/isolate2/entered_context.rs b/crates/isolate/src/isolate2/entered_context.rs index 915c4a76..875d8ad1 100644 --- a/crates/isolate/src/isolate2/entered_context.rs +++ b/crates/isolate/src/isolate2/entered_context.rs @@ -10,8 +10,8 @@ use deno_core::{ v8, ModuleSpecifier, }; -use errors::ErrorMetadata; use serde_json::Value as JsonValue; +use sourcemap::SourceMap; use value::{ ConvexObject, ConvexValue, @@ -20,9 +20,9 @@ use value::{ use super::{ client::{ AsyncSyscallCompletion, + EvaluatePending, + EvaluateReady, EvaluateResult, - PendingAsyncSyscall, - ReadyEvaluateResult, }, context_state::ContextState, }; @@ -35,13 +35,11 @@ use crate::{ to_rust_string, }, isolate::SETUP_URL, - isolate2::context::Context, + isolate2::callback_context::CallbackContext, metrics, - ops::OpProvider, strings, }; -// 'scope can either be 'session or 'callback pub struct EnteredContext<'enter, 'scope: 'enter> { scope: &'enter mut v8::HandleScope<'scope>, context: v8::Local<'scope, v8::Context>, @@ -55,11 +53,6 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { Self { scope, context } } - pub fn from_callback(scope: &'enter mut v8::HandleScope<'scope>) -> Self { - let context = scope.get_current_context(); - Self { scope, context } - } - pub fn context_state_mut(&mut self) -> anyhow::Result<&mut ContextState> { self.context .get_slot_mut::(self.scope) @@ -206,9 +199,8 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { s => anyhow::bail!("Module {url} is in invalid state: {s:?}"), } - let instantiation_result = self.execute_user_code(|s| { - module.instantiate_module(s, Context::module_resolve_callback) - })?; + let instantiation_result = self + .execute_user_code(|s| module.instantiate_module(s, CallbackContext::resolve_module))?; if matches!(instantiation_result, Some(false) | None) { anyhow::bail!("Unexpected successful instantiate result: {instantiation_result:?}"); @@ -242,63 +234,6 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { Ok(module) } - pub fn resolve_module( - &mut self, - specifier: v8::Local<'scope, v8::String>, - referrer: v8::Local<'scope, v8::Module>, - ) -> Option> { - let r: anyhow::Result<_> = try { - let referrer_global = v8::Global::new(self.scope, referrer); - let specifier_str = helpers::to_rust_string(self.scope, &specifier)?; - let context_state = self.context_state()?; - let referrer_name = context_state - .module_map - .lookup_by_v8_module(&referrer_global) - .ok_or_else(|| anyhow!("Module not registered"))? - .to_string(); - let resolved_specifier = deno_core::resolve_import(&specifier_str, &referrer_name)?; - let module = context_state - .module_map - .lookup_module(&resolved_specifier) - .ok_or_else(|| anyhow!("Couldn't find {resolved_specifier}"))? - .clone(); - v8::Local::new(self.scope, module) - }; - match r { - Ok(m) => Some(m), - Err(e) => { - // XXX: This should be a system error! - helpers::throw_type_error(self.scope, format!("{:?}", e)); - None - }, - } - } - - pub fn start_dynamic_import( - &mut self, - resource_name: v8::Local<'scope, v8::Value>, - specifier: v8::Local<'scope, v8::String>, - ) -> anyhow::Result> { - let promise_resolver = v8::PromiseResolver::new(self.scope) - .ok_or_else(|| anyhow::anyhow!("Failed to create v8::PromiseResolver"))?; - - let promise = promise_resolver.get_promise(self.scope); - let resolver = v8::Global::new(self.scope, promise_resolver); - - let resource_name: v8::Local = resource_name.try_into()?; - let referrer_name = helpers::to_rust_string(self.scope, &resource_name)?; - let specifier_str = helpers::to_rust_string(self.scope, &specifier)?; - - let resolved_specifier = deno_core::resolve_import(&specifier_str, &referrer_name) - .map_err(|e| ErrorMetadata::bad_request("InvalidImport", e.to_string()))?; - - self.context_state_mut()? - .pending_dynamic_imports - .push((resolved_specifier, resolver)); - - Ok(promise) - } - pub fn start_evaluate_function( &mut self, udf_type: UdfType, @@ -435,9 +370,10 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { ) -> anyhow::Result { match promise.state() { v8::PromiseState::Pending => { - let async_syscalls = - mem::take(&mut self.context_state_mut()?.pending_async_syscalls); - Ok(EvaluateResult::Pending { async_syscalls }) + let context = self.context_state_mut()?; + let async_syscalls = mem::take(&mut context.pending_async_syscalls); + let pending = EvaluatePending { async_syscalls }; + Ok(EvaluateResult::Pending(pending)) }, v8::PromiseState::Fulfilled => { let result: v8::Local = promise.result(self.scope).try_into()?; @@ -446,10 +382,7 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { let result_json: JsonValue = serde_json::from_str(&result)?; let result = ConvexValue::try_from(result_json)?; let outcome = self.context_state_mut()?.environment.finish_execution()?; - Ok(EvaluateResult::Ready(ReadyEvaluateResult { - result, - outcome, - })) + Ok(EvaluateResult::Ready(EvaluateReady { result, outcome })) }, v8::PromiseState::Rejected => { todo!() @@ -457,129 +390,6 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { } } - pub fn syscall( - &mut self, - args: v8::FunctionCallbackArguments, - ) -> anyhow::Result> { - if args.length() != 2 { - // There's not really an expected developer mistake that would lead to them - // calling Convex.syscall incorrectly -- the bug must be in our - // convex/server code. Treat this as a system error. - anyhow::bail!("syscall(name, arg_object) takes two arguments"); - } - let name: v8::Local = args.get(0).try_into()?; - let name = to_rust_string(self.scope, &name)?; - - let args_v8: v8::Local = args.get(1).try_into()?; - let args_s = to_rust_string(self.scope, &args_v8)?; - let args_v: JsonValue = serde_json::from_str(&args_s).map_err(|e| { - anyhow::anyhow!(ErrorMetadata::bad_request( - "SyscallArgsInvalidJson", - format!("Received invalid json: {e}"), - )) - })?; - - let result = self - .context_state_mut()? - .environment - .syscall(&name, args_v)?; - - let value_s = serde_json::to_string(&result)?; - let value_v8 = v8::String::new(self.scope, &value_s[..]) - .ok_or_else(|| anyhow!("Failed to create result string"))?; - - Ok(value_v8.into()) - } - - pub fn start_async_syscall( - &mut self, - args: v8::FunctionCallbackArguments, - ) -> anyhow::Result> { - if args.length() != 2 { - // There's not really an expected developer mistake that would lead to them - // calling Convex.asyncSyscall incorrectly -- the bug must be in our - // convex/server code. Treat this as a system error. - anyhow::bail!("asyncSyscall(name, arg_object) takes two arguments"); - } - let name: v8::Local = args.get(0).try_into()?; - let name = to_rust_string(self.scope, &name)?; - - let args_v8: v8::Local = args.get(1).try_into()?; - let args_s = to_rust_string(self.scope, &args_v8)?; - let args_v: JsonValue = serde_json::from_str(&args_s).map_err(|e| { - anyhow::anyhow!(ErrorMetadata::bad_request( - "SyscallArgsInvalidJson", - format!("Received invalid json: {e}"), - )) - })?; - - let promise_resolver = v8::PromiseResolver::new(self.scope) - .ok_or_else(|| anyhow::anyhow!("Failed to create v8::PromiseResolver"))?; - - let promise = promise_resolver.get_promise(self.scope); - let resolver = v8::Global::new(self.scope, promise_resolver); - { - let context_state = self.context_state_mut()?; - - let promise_id = context_state.next_promise_id; - context_state.next_promise_id += 1; - - let pending_async_syscall = PendingAsyncSyscall { - promise_id, - name, - args: args_v, - }; - context_state - .pending_async_syscalls - .push(pending_async_syscall); - - context_state.promise_resolvers.insert(promise_id, resolver); - }; - Ok(promise) - } - - pub fn push_unhandled_promise_rejection( - &mut self, - message: v8::PromiseRejectMessage, - ) -> anyhow::Result<()> { - match message.get_event() { - v8::PromiseRejectEvent::PromiseRejectWithNoHandler => { - // See comment on PendingUnhandledPromiseRejections. - // A promise rejection is necessary but not sufficient for an - // 'unhandledRejection' event, which throws in our runtime. - // Save the promise and check back in on it once the microtask - // queue has drained. If it remains unhandled then, throw. - let Some(e) = message.get_value() else { - tracing::warn!("Message missing from call to promise_reject_callback"); - return Ok(()); - }; - let error_global = v8::Global::new(self.scope, e); - let promise_global = v8::Global::new(self.scope, message.get_promise()); - self.context_state_mut()? - .unhandled_promise_rejections - .insert(promise_global, error_global); - }, - v8::PromiseRejectEvent::PromiseHandlerAddedAfterReject => { - tracing::warn!("Promise handler added after reject"); - // If this promise was previously a candidate for an - // 'unhandledRejection' event, disqualify it by removing it - // from `pending_unhandled_promise_rejections`. - let promise_global = v8::Global::new(self.scope, message.get_promise()); - self.context_state_mut()? - .unhandled_promise_rejections - .remove(&promise_global); - // log_promise_handler_added_after_reject(); - }, - v8::PromiseRejectEvent::PromiseRejectAfterResolved => { - tracing::warn!("Promise rejected after resolved"); - }, - v8::PromiseRejectEvent::PromiseResolveAfterResolved => { - tracing::warn!("Promise resolved after resolved"); - }, - } - Ok(()) - } - pub fn format_traceback(&mut self, exception: v8::Local) -> anyhow::Result { // Check if we hit a system error or timeout and can't run any JavaScript now. // Abort with a system error here, and we'll (in the best case) pull out @@ -591,7 +401,11 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { let (message, frame_data, custom_data) = extract_source_mapped_error(self.scope, exception)?; JsError::from_frames(message, frame_data, custom_data, |s| { - self.lookup_source_map(s) + let context_state = self.context_state()?; + let Some(source_map) = context_state.module_map.lookup_source_map(s) else { + return Ok(None); + }; + Ok(Some(SourceMap::from_slice(source_map.as_bytes())?)) }) }; let err = match err { @@ -607,131 +421,3 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> { Ok(err) } } - -mod op_provider { - use std::collections::BTreeMap; - - use bytes::Bytes; - use common::{ - log_lines::LogLevel, - runtime::UnixTimestamp, - types::{ - EnvVarName, - EnvVarValue, - }, - }; - use deno_core::{ - v8, - ModuleSpecifier, - }; - use rand_chacha::ChaCha12Rng; - use sourcemap::SourceMap; - use uuid::Uuid; - use value::{ - heap_size::WithHeapSize, - TableMapping, - TableMappingValue, - VirtualTableMapping, - }; - - use super::EnteredContext; - use crate::{ - environment::AsyncOpRequest, - ops::OpProvider, - request_scope::StreamListener, - }; - - impl<'enter, 'scope: 'enter> OpProvider<'scope> for EnteredContext<'enter, 'scope> { - fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> { - let state = self.context_state_mut()?; - state.environment.rng() - } - - fn scope(&mut self) -> &mut v8::HandleScope<'scope> { - self.scope - } - - fn lookup_source_map( - &mut self, - specifier: &ModuleSpecifier, - ) -> anyhow::Result> { - let context_state = self.context_state()?; - let Some(source_map) = context_state.module_map.lookup_source_map(specifier) else { - return Ok(None); - }; - Ok(Some(SourceMap::from_slice(source_map.as_bytes())?)) - } - - fn trace(&mut self, level: LogLevel, messages: Vec) -> anyhow::Result<()> { - self.context_state_mut()?.environment.trace(level, messages) - } - - fn console_timers( - &mut self, - ) -> anyhow::Result<&mut WithHeapSize>> { - todo!() - } - - fn unix_timestamp(&mut self) -> anyhow::Result { - self.context_state_mut()?.environment.unix_timestamp() - } - - fn unix_timestamp_non_deterministic(&mut self) -> anyhow::Result { - todo!() - } - - fn start_async_op( - &mut self, - _request: AsyncOpRequest, - _resolver: v8::Global, - ) -> anyhow::Result<()> { - todo!(); - } - - fn create_blob_part(&mut self, _bytes: Bytes) -> anyhow::Result { - todo!() - } - - fn get_blob_part(&mut self, _uuid: &Uuid) -> anyhow::Result> { - todo!() - } - - fn create_stream(&mut self) -> anyhow::Result { - todo!() - } - - fn extend_stream( - &mut self, - _id: Uuid, - _bytes: Option, - _new_done: bool, - ) -> anyhow::Result<()> { - todo!() - } - - fn new_stream_listener( - &mut self, - _stream_id: Uuid, - _listener: StreamListener, - ) -> anyhow::Result<()> { - todo!(); - } - - fn get_environment_variable( - &mut self, - _name: EnvVarName, - ) -> anyhow::Result> { - todo!() - } - - fn get_all_table_mappings( - &mut self, - ) -> anyhow::Result<(TableMapping, VirtualTableMapping)> { - todo!() - } - - fn get_table_mapping_without_system_tables(&mut self) -> anyhow::Result { - todo!() - } - } -} diff --git a/crates/isolate/src/isolate2/mod.rs b/crates/isolate/src/isolate2/mod.rs index aa481357..2521b0d4 100644 --- a/crates/isolate/src/isolate2/mod.rs +++ b/crates/isolate/src/isolate2/mod.rs @@ -1,3 +1,4 @@ +pub mod callback_context; pub mod client; pub mod context; pub mod context_state; diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index eeb20565..20b0280f 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -1,17 +1,6 @@ -// TODO: -// - QueryManager, lazy query initialization -// - Source maps on V8 thread -// - Sending table mappings to V8 thread -// - Environment variables and lazy read set size check -// - Log streaming -// - Changing invocation API to be less UDF centric -// - Timer for logging user time from tokio thread -// - Error handling -// - Regular actions -// - HTTP actions -// - Other environments (schema, auth.config.js, analyze) use std::{ cmp::Ordering, + collections::BTreeMap, sync::Arc, time::Duration, }; @@ -24,6 +13,7 @@ use common::{ LogLine, LogLines, }, + query::Query, query_journal::QueryJournal, runtime::{ Runtime, @@ -34,9 +24,11 @@ use common::{ PersistenceVersion, UdfType, }, + version::Version, }; use database::{ query::TableFilter, + DeveloperQuery, Transaction, }; use errors::ErrorMetadata; @@ -49,6 +41,11 @@ use futures::{ StreamExt, }; use keybroker::KeyBroker; +use model::file_storage::{ + types::FileStorageEntry, + FileStorageId, +}; +use parking_lot::Mutex; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use serde_json::Value as JsonValue; @@ -58,17 +55,24 @@ use sync_types::{ }; use tokio::sync::Semaphore; use value::{ + ConvexArray, ConvexObject, ConvexValue, + TableIdAndTableNumber, + TableMapping, + TableName, + TableNumber, + VirtualTableMapping, }; use super::{ client::{ AsyncSyscallCompletion, + EvaluateReady, EvaluateResult, IsolateThreadClient, IsolateThreadRequest, - ReadyEvaluateResult, + QueryId, }, context::Context, environment::{ @@ -93,7 +97,7 @@ use crate::{ AsyncSyscallBatch, AsyncSyscallProvider, DatabaseSyscallsV1, - QueryManager, + ManagedQuery, }, syscall::{ syscall_impl, @@ -185,12 +189,16 @@ pub struct SeedData { struct UdfEnvironment { rt: RT, + is_system: bool, + log_lines: LogLines, import_time_seed: SeedData, execution_time_seed: SeedData, phase: UdfPhase, + + shared: UdfShared, } #[derive(Debug)] @@ -207,31 +215,67 @@ enum UdfPhase { } impl UdfEnvironment { - pub fn new(rt: RT, import_time_seed: SeedData, execution_time_seed: SeedData) -> Self { + pub fn new( + rt: RT, + is_system: bool, + import_time_seed: SeedData, + execution_time_seed: SeedData, + shared: UdfShared, + ) -> Self { let rng = ChaCha12Rng::from_seed(import_time_seed.rng_seed); Self { rt, + is_system, log_lines: vec![].into(), import_time_seed, execution_time_seed, phase: UdfPhase::Importing { rng }, + + shared, } } + + fn check_executing(&self) -> anyhow::Result<()> { + let UdfPhase::Executing { .. } = self.phase else { + // TODO: Is this right? Should we just be using JsError? + anyhow::bail!(ErrorMetadata::bad_request( + "NoDbDuringImport", + "Can't use database at import time", + )) + }; + Ok(()) + } } impl SyscallProvider for UdfEnvironment { fn table_filter(&self) -> TableFilter { - todo!(); + if self.is_system { + TableFilter::IncludePrivateSystemTables + } else { + TableFilter::ExcludePrivateSystemTables + } } - fn query_manager(&mut self) -> &mut QueryManager { - todo!(); + fn lookup_table(&mut self, name: &TableName) -> anyhow::Result> { + self.check_executing()?; + self.shared.lookup_table(name) } - fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata> { - todo!(); + fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result> { + self.check_executing()?; + self.shared.lookup_virtual_table(name) + } + + fn start_query(&mut self, query: Query, version: Option) -> anyhow::Result { + self.check_executing()?; + let query_id = self.shared.start_query(query, version); + Ok(query_id) + } + + fn cleanup_query(&mut self, query_id: u32) -> bool { + self.shared.cleanup_query(query_id) } } @@ -364,6 +408,7 @@ async fn run_request( udf_type: UdfType, udf_path: CanonicalizedUdfPath, args: ConvexObject, + shared: UdfShared, ) -> anyhow::Result { // Phase 1: Load and register all source needed, and evaluate the UDF's module. let r: anyhow::Result<_> = try { @@ -417,15 +462,15 @@ async fn run_request( }, }; - let mut provider = Isolate2SyscallProvider { + let mut provider = Isolate2SyscallProvider::new( tx, rt, - query_manager: QueryManager::new(), - unix_timestamp: execution_time_seed.unix_timestamp, - prev_journal: QueryJournal::new(), - next_journal: QueryJournal::new(), - is_system: udf_path.is_system(), - }; + execution_time_seed.unix_timestamp, + QueryJournal::new(), + QueryJournal::new(), + udf_path.is_system(), + shared, + ); // Phase 2: Start the UDF, execute its async syscalls, and poll until // completion. @@ -439,16 +484,16 @@ async fn run_request( ) .await?; loop { - let async_syscalls = match result { + let pending = match result { EvaluateResult::Ready(r) => break r, - EvaluateResult::Pending { async_syscalls } => async_syscalls, + EvaluateResult::Pending(p) => p, }; let mut completions = vec![]; let mut syscall_batch = None; let mut batch_promise_ids = vec![]; - for async_syscall in async_syscalls { + for async_syscall in pending.async_syscalls { let promise_id = async_syscall.promise_id; match syscall_batch { None => { @@ -494,7 +539,7 @@ async fn run_request( }; let (result, outcome) = match r { - Ok(ReadyEvaluateResult { result, outcome }) => (Ok(JsonPackedValue::pack(result)), outcome), + Ok(EvaluateReady { result, outcome }) => (Ok(JsonPackedValue::pack(result)), outcome), Err(e) => { let js_error = e.downcast::()?; // TODO: Ask the V8 thread for its outcome. @@ -516,17 +561,85 @@ async fn run_request( log_lines: vec![].into(), journal: provider.next_journal, result, - syscall_trace: SyscallTrace::new(), + syscall_trace: provider.syscall_trace, udf_server_version: None, }; Ok(outcome) } +struct UdfShared { + inner: Arc>>, +} + +impl Clone for UdfShared { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl UdfShared { + pub fn new(table_mapping: TableMapping, virtual_table_mapping: VirtualTableMapping) -> Self { + Self { + inner: Arc::new(Mutex::new(UdfSharedInner { + next_query_id: 0, + queries: BTreeMap::new(), + table_mapping, + virtual_table_mapping, + })), + } + } + + fn lookup_table(&mut self, name: &TableName) -> anyhow::Result> { + let inner = self.inner.lock(); + Ok(inner.table_mapping.id_and_number_if_exists(name)) + } + + fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result> { + let inner = self.inner.lock(); + Ok(inner.virtual_table_mapping.number_if_exists(name)) + } + + fn start_query(&self, query: Query, version: Option) -> QueryId { + let mut inner = self.inner.lock(); + let query_id = inner.next_query_id; + inner.next_query_id += 1; + inner + .queries + .insert(query_id, ManagedQuery::Pending { query, version }); + query_id + } + + fn take_query(&self, query_id: QueryId) -> Option> { + let mut inner = self.inner.lock(); + inner.queries.remove(&query_id) + } + + fn insert_query(&self, query_id: QueryId, query: DeveloperQuery) { + let mut inner = self.inner.lock(); + inner.queries.insert(query_id, ManagedQuery::Active(query)); + } + + fn cleanup_query(&self, query_id: u32) -> bool { + let mut inner = self.inner.lock(); + inner.queries.remove(&query_id).is_some() + } +} + +struct UdfSharedInner { + next_query_id: QueryId, + queries: BTreeMap>, + + table_mapping: TableMapping, + virtual_table_mapping: VirtualTableMapping, +} + struct Isolate2SyscallProvider<'a, RT: Runtime> { tx: &'a mut Transaction, rt: RT, - query_manager: QueryManager, + shared: UdfShared, unix_timestamp: UnixTimestamp, @@ -534,6 +647,31 @@ struct Isolate2SyscallProvider<'a, RT: Runtime> { next_journal: QueryJournal, is_system: bool, + + syscall_trace: SyscallTrace, +} + +impl<'a, RT: Runtime> Isolate2SyscallProvider<'a, RT> { + fn new( + tx: &'a mut Transaction, + rt: RT, + unix_timestamp: UnixTimestamp, + prev_journal: QueryJournal, + next_journal: QueryJournal, + is_system: bool, + shared: UdfShared, + ) -> Self { + Self { + tx, + rt, + shared, + unix_timestamp, + prev_journal, + next_journal, + is_system, + syscall_trace: SyscallTrace::new(), + } + } } impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, RT> { @@ -542,7 +680,7 @@ impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, R } fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata> { - // TODO: phases. + // TODO: Check that we're during the execution phase. Ok(self.tx) } @@ -555,7 +693,8 @@ impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, R } fn unix_timestamp(&self) -> anyhow::Result { - // TODO: phases. + // TODO: Switch between the import time and execution time timestamps based on + // phase. Ok(self.unix_timestamp) } @@ -571,10 +710,9 @@ impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, R } } - fn log_async_syscall(&mut self, _name: String, _duration: Duration, _is_success: bool) {} - - fn query_manager(&mut self) -> &mut QueryManager { - &mut self.query_manager + fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool) { + self.syscall_trace + .log_async_syscall(name, duration, is_success); } fn prev_journal(&mut self) -> &mut QueryJournal { @@ -590,7 +728,7 @@ impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, R _udf_path: UdfPath, _args: Vec, _scheduled_ts: UnixTimestamp, - ) -> anyhow::Result<(UdfPath, value::ConvexArray)> { + ) -> anyhow::Result<(UdfPath, ConvexArray)> { todo!() } @@ -600,24 +738,33 @@ impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, R async fn file_storage_get_url( &mut self, - _storage_id: model::file_storage::FileStorageId, + _storage_id: FileStorageId, ) -> anyhow::Result> { todo!() } - async fn file_storage_delete( - &mut self, - _storage_id: model::file_storage::FileStorageId, - ) -> anyhow::Result<()> { + async fn file_storage_delete(&mut self, _storage_id: FileStorageId) -> anyhow::Result<()> { todo!() } async fn file_storage_get_entry( &mut self, - _storage_id: model::file_storage::FileStorageId, - ) -> anyhow::Result> { + _storage_id: FileStorageId, + ) -> anyhow::Result> { todo!() } + + fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery) { + self.shared.insert_query(query_id, query) + } + + fn take_query(&mut self, query_id: QueryId) -> Option> { + self.shared.take_query(query_id) + } + + fn cleanup_query(&mut self, query_id: u32) -> bool { + self.shared.cleanup_query(query_id) + } } async fn tokio_thread( @@ -631,6 +778,7 @@ async fn tokio_thread( udf_type: UdfType, udf_path: CanonicalizedUdfPath, args: ConvexObject, + shared: UdfShared, ) { let request = run_request( rt.clone(), @@ -641,6 +789,7 @@ async fn tokio_thread( udf_type, udf_path, args, + shared, ); let r = futures::select_biased! { @@ -656,7 +805,7 @@ async fn tokio_thread( pub async fn run_isolate_v2_udf( rt: RT, - tx: Transaction, + mut tx: Transaction, module_loader: Arc>, import_time_seed: SeedData, execution_time_seed: SeedData, @@ -677,7 +826,18 @@ pub async fn run_isolate_v2_udf( // based on a tx timestamp that may be out of retention. let total_timeout = Duration::from_secs(10); - let environment = UdfEnvironment::new(rt.clone(), import_time_seed, execution_time_seed); + // TODO: This unconditionally takes a table mapping dep. + let shared = UdfShared::new( + tx.table_mapping().clone(), + tx.virtual_table_mapping().clone(), + ); + let environment = UdfEnvironment::new( + rt.clone(), + udf_path.is_system(), + import_time_seed, + execution_time_seed, + shared.clone(), + ); // The protocol is synchronous, so there should never be more than // one pending request at a time. @@ -703,6 +863,7 @@ pub async fn run_isolate_v2_udf( udf_type, udf_path, args, + shared, ), ); diff --git a/crates/isolate/src/isolate2/session.rs b/crates/isolate/src/isolate2/session.rs index 2db0caa7..e5c703a0 100644 --- a/crates/isolate/src/isolate2/session.rs +++ b/crates/isolate/src/isolate2/session.rs @@ -8,10 +8,9 @@ use deno_core::v8::{ }; use super::{ - entered_context::EnteredContext, + callback_context::CallbackContext, thread::Thread, }; -use crate::helpers; // Isolate-level struct scoped to a "session," which enables isolate reuse // across sessions. @@ -37,9 +36,10 @@ impl<'a> Session<'a> { heap_ctx_ptr as *mut ffi::c_void, ); - handle_scope.set_promise_reject_callback(Self::promise_reject_callback); + handle_scope.set_promise_reject_callback(CallbackContext::promise_reject_callback); - handle_scope.set_host_import_module_dynamically_callback(Self::dynamic_import_callback); + handle_scope + .set_host_import_module_dynamically_callback(CallbackContext::dynamic_import_callback); Self { handle_scope, @@ -60,39 +60,6 @@ impl<'a> Session<'a> { // Double heap limit to avoid a hard OOM. current_heap_limit * 2 } - - extern "C" fn promise_reject_callback(message: v8::PromiseRejectMessage) { - let mut scope = unsafe { v8::CallbackScope::new(&message) }; - - // NB: If we didn't `Context::enter` above in the stack, it's possible - // that our scope will be attached to the default context at the top of the - // stack, which then won't have the `RequestState` slot. This will then cause - // the call into `ctx.push_unhandled_promise_rejection` to fail with a system - // error, which we'll just trace out here. - let mut ctx = EnteredContext::from_callback(&mut scope); - - if let Err(e) = ctx.push_unhandled_promise_rejection(message) { - tracing::error!("Error in promise_reject_callback: {:?}", e); - } - } - - fn dynamic_import_callback<'s>( - scope: &mut v8::HandleScope<'s>, - _host_defined_options: v8::Local<'s, v8::Data>, - resource_name: v8::Local<'s, v8::Value>, - specifier: v8::Local<'s, v8::String>, - _import_assertions: v8::Local<'s, v8::FixedArray>, - ) -> Option> { - let mut ctx = EnteredContext::from_callback(scope); - match ctx.start_dynamic_import(resource_name, specifier) { - Ok(promise) => Some(promise), - Err(e) => { - // XXX: distinguish between system and user errors here. - helpers::throw_type_error(scope, format!("{:?}", e)); - None - }, - } - } } impl<'a> Drop for Session<'a> { diff --git a/crates/isolate/src/tests/basic.rs b/crates/isolate/src/tests/basic.rs index af371954..53b294c4 100644 --- a/crates/isolate/src/tests/basic.rs +++ b/crates/isolate/src/tests/basic.rs @@ -196,8 +196,7 @@ async fn test_insert_increase_and_delete(rt: TestRuntime) -> anyhow::Result<()> #[convex_macro::test_runtime] async fn test_insert_and_delete(rt: TestRuntime) -> anyhow::Result<()> { - // TODO: Port to isolate2 when we support the QueryManager. - UdfTest::run_test_with_isolate(rt, async move |t: UdfTestType| { + UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { let value = assert_val!("I am a phantom"); must_let!(let ConvexValue::Object(obj) = t.mutation( "basic:insertAndDeleteObject", @@ -271,8 +270,7 @@ async fn test_count(rt: TestRuntime) -> anyhow::Result<()> { #[convex_macro::test_runtime] async fn test_patch(rt: TestRuntime) -> anyhow::Result<()> { - // TODO: Port to isolate2 when we fix creation time assignment. - UdfTest::run_test_with_isolate(rt, async move |t: UdfTestType| { + UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { // Insert an object. must_let!(let ConvexValue::Object(obj) = t.mutation( "basic:insertObject", @@ -382,8 +380,7 @@ async fn test_replace(rt: TestRuntime) -> anyhow::Result<()> { #[convex_macro::test_runtime] async fn test_query_missing_table(rt: TestRuntime) -> anyhow::Result<()> { - // TODO: Port to isolate2 when we support the QueryManager. - UdfTest::run_test_with_isolate(rt, async move |t: UdfTestType| { + UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { // Tables are implicitly created when we insert the first record. // This means that query before that is querying a missing table. // A user will expect no results instead of an error here. @@ -395,8 +392,7 @@ async fn test_query_missing_table(rt: TestRuntime) -> anyhow::Result<()> { #[convex_macro::test_runtime] async fn test_time_constructor_args(rt: TestRuntime) -> anyhow::Result<()> { - // TODO: Port to isolate2 when we support JsErrors. - UdfTest::run_test_with_isolate(rt, async move |t: UdfTestType| { + UdfTest::run_test_with_isolate2(rt, async move |t: UdfTestType| { let ms_in: f64 = 1234567890123.0; must_let!(let ConvexValue::Float64(ms_out) = t.query("basic:createTimeMs", assert_obj!("args" => [ms_in] )).await?); assert_eq!(ms_in, ms_out);