From 76f55b738d8eed1e928ee9b078f9fc1aac57565a Mon Sep 17 00:00:00 2001 From: Sujay Jayakar Date: Fri, 12 Apr 2024 18:10:09 -0400 Subject: [PATCH] Wire the syscall providers into isolate2 (#24616) ## Open Source There's still a bunch of unimplemented functionality here, but it's enough to get a basic mutation working! GitOrigin-RevId: 649d89d154eeeb7be2bd4affb7ebd409cfa5facd --- .../src/environment/udf/async_syscall.rs | 19 +- crates/isolate/src/environment/udf/mod.rs | 4 +- crates/isolate/src/environment/udf/syscall.rs | 121 +++++---- crates/isolate/src/isolate2/runner.rs | 245 ++++++++++++++++-- crates/isolate/src/test_helpers.rs | 9 +- crates/isolate/src/tests/basic.rs | 4 +- 6 files changed, 319 insertions(+), 83 deletions(-) diff --git a/crates/isolate/src/environment/udf/async_syscall.rs b/crates/isolate/src/environment/udf/async_syscall.rs index 388018c1..1aa34d07 100644 --- a/crates/isolate/src/environment/udf/async_syscall.rs +++ b/crates/isolate/src/environment/udf/async_syscall.rs @@ -106,7 +106,7 @@ impl HeapSize for PendingSyscall { // Checks if the underlying table and the request's expectation for the table // line up. -fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> { +pub fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> { if expect_system_table && !name.is_system() { return Err(anyhow::anyhow!(ErrorMetadata::bad_request( "SystemTableError", @@ -185,6 +185,10 @@ impl AsyncSyscallBatch { Self::Unbatched { .. } => 1, } } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } pub struct QueryManager { @@ -221,7 +225,8 @@ impl QueryManager { } // Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2. -pub trait SyscallProvider { +#[allow(async_fn_in_trait)] +pub trait AsyncSyscallProvider { fn rt(&self) -> &RT; fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata>; fn key_broker(&self) -> &KeyBroker; @@ -258,7 +263,7 @@ pub trait SyscallProvider { ) -> anyhow::Result>; } -impl SyscallProvider for DatabaseUdfEnvironment { +impl AsyncSyscallProvider for DatabaseUdfEnvironment { fn rt(&self) -> &RT { &self.phase.rt } @@ -361,11 +366,11 @@ impl SyscallProvider for DatabaseUdfEnvironment { /// /// Most of the common logic lives on `Transaction` or `DatabaseSyscallsShared`, /// and this is mostly just taking care of the argument parsing. -pub struct DatabaseSyscallsV1> { +pub struct DatabaseSyscallsV1> { _pd: PhantomData<(RT, P)>, } -impl> DatabaseSyscallsV1 { +impl> DatabaseSyscallsV1 { /// Runs a batch of syscalls, each of which can succeed or fail /// independently. The returned vec is the same length as the batch. #[minitrace::trace] @@ -893,7 +898,7 @@ impl> DatabaseSyscallsV1 { } } -struct DatabaseSyscallsShared> { +struct DatabaseSyscallsShared> { _pd: PhantomData<(RT, P)>, } @@ -923,7 +928,7 @@ struct QueryPageMetadata { page_status: Option, } -impl> DatabaseSyscallsShared { +impl> DatabaseSyscallsShared { async fn read_page_from_query( mut query: CompiledQuery, tx: &mut Transaction, diff --git a/crates/isolate/src/environment/udf/mod.rs b/crates/isolate/src/environment/udf/mod.rs index 19eacf5e..7378199d 100644 --- a/crates/isolate/src/environment/udf/mod.rs +++ b/crates/isolate/src/environment/udf/mod.rs @@ -8,11 +8,11 @@ use model::environment_variables::types::{ EnvVarName, EnvVarValue, }; -mod async_syscall; +pub mod async_syscall; pub mod outcome; mod phase; -mod syscall; +pub mod syscall; use std::{ cmp::Ordering, collections::VecDeque, diff --git a/crates/isolate/src/environment/udf/syscall.rs b/crates/isolate/src/environment/udf/syscall.rs index 1cad2907..e8f4eca5 100644 --- a/crates/isolate/src/environment/udf/syscall.rs +++ b/crates/isolate/src/environment/udf/syscall.rs @@ -1,5 +1,3 @@ -#![allow(non_snake_case)] - use std::convert::TryFrom; use anyhow::Context; @@ -11,6 +9,7 @@ use common::{ use database::{ query::TableFilter, DeveloperQuery, + Transaction, }; use errors::ErrorMetadata; use serde::{ @@ -27,9 +26,9 @@ use value::{ TableName, }; -use super::async_syscall::{ - DatabaseSyscallsV1, - SyscallProvider, +use super::{ + async_syscall::QueryManager, + DatabaseUdfEnvironment, }; use crate::environment::helpers::{ parse_version, @@ -37,15 +36,39 @@ use crate::environment::helpers::{ ArgName, }; +pub trait SyscallProvider { + fn table_filter(&self) -> TableFilter; + fn query_manager(&mut self) -> &mut QueryManager; + fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata>; +} + +impl SyscallProvider for DatabaseUdfEnvironment { + fn table_filter(&self) -> TableFilter { + if self.udf_path.is_system() { + TableFilter::IncludePrivateSystemTables + } else { + TableFilter::ExcludePrivateSystemTables + } + } + + fn query_manager(&mut self) -> &mut QueryManager { + &mut self.query_manager + } + + fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata> { + self.phase.tx() + } +} + pub fn syscall_impl>( provider: &mut P, name: &str, args: JsonValue, ) -> anyhow::Result { match name { - "1.0/queryCleanup" => DatabaseSyscallsV1::syscall_queryCleanup(provider, args), - "1.0/queryStream" => DatabaseSyscallsV1::syscall_queryStream(provider, args), - "1.0/db/normalizeId" => syscall_normalizeId(provider, args), + "1.0/queryCleanup" => syscall_query_cleanup(provider, args), + "1.0/queryStream" => syscall_query_stream(provider, args), + "1.0/db/normalizeId" => syscall_normalize_id(provider, args), #[cfg(test)] "throwSystemError" => anyhow::bail!("I can't go for that."), @@ -73,7 +96,7 @@ pub fn syscall_impl>( } } -fn syscall_normalizeId>( +fn syscall_normalize_id>( provider: &mut P, args: JsonValue, ) -> anyhow::Result { @@ -128,48 +151,52 @@ fn syscall_normalizeId>( } } -impl> DatabaseSyscallsV1 { - fn syscall_queryStream(provider: &mut P, args: JsonValue) -> anyhow::Result { - let _s: common::tracing::NoopSpan = static_span!(); - let table_filter = provider.table_filter(); - let tx = provider.tx()?; +fn syscall_query_stream>( + provider: &mut P, + args: JsonValue, +) -> anyhow::Result { + let _s: common::tracing::NoopSpan = static_span!(); + let table_filter = provider.table_filter(); + let tx = provider.tx()?; - #[derive(Deserialize)] - struct QueryStreamArgs { - query: JsonValue, - version: Option, - } - let (parsed_query, version) = with_argument_error("queryStream", || { - let args: QueryStreamArgs = serde_json::from_value(args)?; - let parsed_query = Query::try_from(args.query).context(ArgName("query"))?; - let version = parse_version(args.version)?; - Ok((parsed_query, version)) - })?; - // TODO: Are all invalid query pipelines developer errors? These could be bugs - // in convex/server. - let compiled_query = - { DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? }; - let query_id = provider.query_manager().put_developer(compiled_query); + #[derive(Deserialize)] + struct QueryStreamArgs { + query: JsonValue, + version: Option, + } + let (parsed_query, version) = with_argument_error("queryStream", || { + let args: QueryStreamArgs = serde_json::from_value(args)?; + let parsed_query = Query::try_from(args.query).context(ArgName("query"))?; + let version = parse_version(args.version)?; + Ok((parsed_query, version)) + })?; + // TODO: Are all invalid query pipelines developer errors? These could be bugs + // in convex/server. + let compiled_query = + { DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? }; + let query_id = provider.query_manager().put_developer(compiled_query); - #[derive(Serialize)] - #[serde(rename_all = "camelCase")] - struct QueryStreamResult { - query_id: u32, - } - Ok(serde_json::to_value(QueryStreamResult { query_id })?) + #[derive(Serialize)] + #[serde(rename_all = "camelCase")] + struct QueryStreamResult { + query_id: u32, } + Ok(serde_json::to_value(QueryStreamResult { query_id })?) +} - fn syscall_queryCleanup(provider: &mut P, args: JsonValue) -> anyhow::Result { - let _s = static_span!(); +fn syscall_query_cleanup>( + provider: &mut P, + args: JsonValue, +) -> anyhow::Result { + let _s = static_span!(); - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct QueryCleanupArgs { - query_id: u32, - } - let args: QueryCleanupArgs = - with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?; - let cleaned_up = provider.query_manager().cleanup_developer(args.query_id); - Ok(serde_json::to_value(cleaned_up)?) + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct QueryCleanupArgs { + query_id: u32, } + let args: QueryCleanupArgs = + with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?; + let cleaned_up = provider.query_manager().cleanup_developer(args.query_id); + Ok(serde_json::to_value(cleaned_up)?) } diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index cfa2704f..640344ce 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -5,19 +5,28 @@ use std::{ }; use common::{ + execution_context::ExecutionContext, log_lines::{ LogLevel, LogLine, LogLines, }, + query_journal::QueryJournal, runtime::{ Runtime, SpawnHandle, UnixTimestamp, }, - types::UdfType, + types::{ + PersistenceVersion, + UdfType, + }, +}; +use database::{ + query::TableFilter, + Transaction, }; -use database::Transaction; +use errors::ErrorMetadata; use futures::{ channel::{ mpsc, @@ -26,6 +35,7 @@ use futures::{ FutureExt, StreamExt, }; +use keybroker::KeyBroker; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use serde_json::Value as JsonValue; @@ -53,12 +63,26 @@ use super::{ }; use crate::{ client::initialize_v8, - environment::helpers::{ - module_loader::{ - module_specifier_from_path, - path_from_module_specifier, + environment::{ + helpers::{ + module_loader::{ + module_specifier_from_path, + path_from_module_specifier, + }, + MAX_LOG_LINES, + }, + udf::{ + async_syscall::{ + AsyncSyscallBatch, + AsyncSyscallProvider, + DatabaseSyscallsV1, + QueryManager, + }, + syscall::{ + syscall_impl, + SyscallProvider, + }, }, - MAX_LOG_LINES, }, ModuleLoader, }; @@ -147,12 +171,23 @@ impl UdfEnvironment { } } +impl SyscallProvider for UdfEnvironment { + fn table_filter(&self) -> TableFilter { + todo!(); + } + + fn query_manager(&mut self) -> &mut QueryManager { + todo!(); + } + + fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata> { + todo!(); + } +} + impl Environment for UdfEnvironment { - fn syscall(&mut self, op: &str, args: JsonValue) -> anyhow::Result { - if op == "echo" { - return Ok(args); - } - anyhow::bail!("Syscall not implemented") + fn syscall(&mut self, name: &str, args: JsonValue) -> anyhow::Result { + syscall_impl(self, name, args) } fn trace( @@ -221,9 +256,11 @@ impl Environment for UdfEnvironment { } async fn run_request( - client: &mut IsolateThreadClient, - mut tx: Transaction, + rt: RT, + tx: &mut Transaction, module_loader: Arc>, + unix_timestamp: UnixTimestamp, + client: &mut IsolateThreadClient, udf_type: UdfType, udf_path: CanonicalizedUdfPath, args: ConvexObject, @@ -232,10 +269,7 @@ async fn run_request( while let Some(module_path) = stack.pop() { let module_specifier = module_specifier_from_path(&module_path)?; - let Some(module_metadata) = module_loader - .get_module(&mut tx, module_path.clone()) - .await? - else { + let Some(module_metadata) = module_loader.get_module(tx, module_path.clone()).await? else { anyhow::bail!("Module not found: {module_path:?}") }; let requests = client @@ -251,6 +285,16 @@ async fn run_request( client.evaluate_module(udf_module_specifier.clone()).await?; + let mut provider = Isolate2SyscallProvider { + tx, + rt, + query_manager: QueryManager::new(), + unix_timestamp, + prev_journal: QueryJournal::new(), + next_journal: QueryJournal::new(), + is_system: udf_path.is_system(), + }; + let (function_id, mut result) = client .start_function( udf_type, @@ -267,19 +311,164 @@ async fn run_request( }; let mut completions = vec![]; + + let mut syscall_batch = None; + let mut batch_promise_ids = vec![]; + for async_syscall in async_syscalls { let promise_id = async_syscall.promise_id; - let result = Ok(JsonValue::from(1)); - completions.push(AsyncSyscallCompletion { promise_id, result }); + match syscall_batch { + None => { + syscall_batch = Some(AsyncSyscallBatch::new( + async_syscall.name, + async_syscall.args, + )); + assert!(batch_promise_ids.is_empty()); + batch_promise_ids.push(promise_id); + }, + Some(ref mut batch) if batch.can_push(&async_syscall.name, &async_syscall.args) => { + batch.push(async_syscall.name, async_syscall.args)?; + batch_promise_ids.push(promise_id); + }, + Some(batch) => { + let results = + DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await?; + assert_eq!(results.len(), batch_promise_ids.len()); + + for (promise_id, result) in batch_promise_ids.drain(..).zip(results) { + // TODO: Avoid reparsing the result here. + let result: JsonValue = serde_json::from_str(&(result?))?; + completions.push(AsyncSyscallCompletion { + promise_id, + result: Ok(result), + }); + } + + syscall_batch = None; + }, + } } + if let Some(batch) = syscall_batch { + let results = DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await?; + assert_eq!(results.len(), batch_promise_ids.len()); + + for (promise_id, result) in batch_promise_ids.into_iter().zip(results) { + // TODO: Avoid reparsing the result here. + let result: JsonValue = serde_json::from_str(&(result?))?; + completions.push(AsyncSyscallCompletion { + promise_id, + result: Ok(result), + }); + } + } + result = client.poll_function(function_id, completions).await?; } } +struct Isolate2SyscallProvider<'a, RT: Runtime> { + tx: &'a mut Transaction, + rt: RT, + + query_manager: QueryManager, + + unix_timestamp: UnixTimestamp, + + prev_journal: QueryJournal, + next_journal: QueryJournal, + + is_system: bool, +} + +impl<'a, RT: Runtime> AsyncSyscallProvider for Isolate2SyscallProvider<'a, RT> { + fn rt(&self) -> &RT { + &self.rt + } + + fn tx(&mut self) -> Result<&mut Transaction, ErrorMetadata> { + // TODO: phases. + Ok(self.tx) + } + + fn key_broker(&self) -> &KeyBroker { + todo!() + } + + fn context(&self) -> &ExecutionContext { + todo!() + } + + fn unix_timestamp(&self) -> anyhow::Result { + // TODO: phases. + Ok(self.unix_timestamp) + } + + fn persistence_version(&self) -> PersistenceVersion { + todo!() + } + + fn table_filter(&self) -> TableFilter { + if self.is_system { + TableFilter::IncludePrivateSystemTables + } else { + TableFilter::ExcludePrivateSystemTables + } + } + + fn log_async_syscall(&mut self, _name: String, _duration: Duration, _is_success: bool) {} + + fn query_manager(&mut self) -> &mut QueryManager { + &mut self.query_manager + } + + fn prev_journal(&mut self) -> &mut QueryJournal { + &mut self.prev_journal + } + + fn next_journal(&mut self) -> &mut QueryJournal { + &mut self.next_journal + } + + async fn validate_schedule_args( + &mut self, + _udf_path: UdfPath, + _args: Vec, + _scheduled_ts: UnixTimestamp, + ) -> anyhow::Result<(UdfPath, value::ConvexArray)> { + todo!() + } + + fn file_storage_generate_upload_url(&self) -> anyhow::Result { + todo!() + } + + async fn file_storage_get_url( + &mut self, + _storage_id: model::file_storage::FileStorageId, + ) -> anyhow::Result> { + todo!() + } + + async fn file_storage_delete( + &mut self, + _storage_id: model::file_storage::FileStorageId, + ) -> anyhow::Result<()> { + todo!() + } + + async fn file_storage_get_entry( + &mut self, + _storage_id: model::file_storage::FileStorageId, + ) -> anyhow::Result> { + todo!() + } +} + async fn tokio_thread( rt: RT, - tx: Transaction, + mut tx: Transaction, module_loader: Arc>, + unix_timestamp: UnixTimestamp, mut client: IsolateThreadClient, total_timeout: Duration, mut sender: oneshot::Sender>, @@ -287,8 +476,19 @@ async fn tokio_thread( udf_path: CanonicalizedUdfPath, args: ConvexObject, ) { + let request = run_request( + rt.clone(), + &mut tx, + module_loader, + unix_timestamp, + &mut client, + udf_type, + udf_path, + args, + ); + let r = futures::select_biased! { - r = run_request(&mut client, tx, module_loader, udf_type, udf_path, args).fuse() => r, + r = request.fuse() => r, // Eventually we'll attempt to cleanup the isolate thread in these conditions. _ = rt.wait(total_timeout) => Err(anyhow::anyhow!("Total timeout exceeded")), @@ -340,6 +540,7 @@ pub async fn run_isolate_v2_udf( rt.clone(), tx, module_loader, + unix_timestamp, client, total_timeout, sender, diff --git a/crates/isolate/src/test_helpers.rs b/crates/isolate/src/test_helpers.rs index 4dd30eab..5b9c8b51 100644 --- a/crates/isolate/src/test_helpers.rs +++ b/crates/isolate/src/test_helpers.rs @@ -499,9 +499,11 @@ impl UdfTest { if let Ok(ref packed_result) = outcome.result && self.isolate_v2_enabled { - let result = packed_result.unpack(); + let _result = packed_result.unpack(); let tx = self.database.begin(identity.clone()).await?; - let v2_result = run_isolate_v2_udf( + // We can't actually compare the results since they're not pinning + // the same runtime state. Just check that the UDF completes for now. + let _v2_result = run_isolate_v2_udf( self.rt.clone(), tx, Arc::new(TransactionModuleLoader), @@ -513,7 +515,6 @@ impl UdfTest { .try_into()?, ) .await?; - anyhow::ensure!(result == v2_result); } self.database @@ -658,7 +659,7 @@ impl UdfTest { .try_into()?, ) .await?; - anyhow::ensure!(result == v2_result); + anyhow::ensure!(result == v2_result, "{result} != {v2_result}"); } Ok(query_outcome) diff --git a/crates/isolate/src/tests/basic.rs b/crates/isolate/src/tests/basic.rs index 246918d4..3b5a22a0 100644 --- a/crates/isolate/src/tests/basic.rs +++ b/crates/isolate/src/tests/basic.rs @@ -61,7 +61,9 @@ async fn test_javascript(rt: TestRuntime) -> anyhow::Result<()> { #[convex_macro::test_runtime] async fn test_insert_object(rt: TestRuntime) -> anyhow::Result<()> { - let t = UdfTest::default(rt).await?; + let mut t = UdfTest::default(rt).await?; + t.enable_isolate_v2(); + let values = [ assert_val!(10), assert_val!(-0.),