diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 553c7ff685c..3b9ca18ddd5 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -29,12 +29,12 @@ use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRo use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; -use crate::util::jobs::SingleCoreExecutor; +use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor, SingleThreadedExecutor}; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; use derive_more::From; -use futures::lock::Mutex; +use futures::lock::Mutex as AsyncMutex; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, HistogramTimer, IntGauge}; @@ -341,7 +341,10 @@ impl ReducersMap { pub enum ModuleWithInstance { Wasm { module: super::wasmtime::Module, - executor: SingleCoreExecutor, + procedure_module: super::wasmtime::ProcedureModule, + main_thread_name: String, + procedure_thread_name: String, + core: AllocatedJobCore, init_inst: Box, procedure_instance_pool_size: NonZeroUsize, }, @@ -370,63 +373,78 @@ impl Drop for CallTimerGuard { } } -type WasmtimeInstanceManager = ModuleInstanceManager>; +type WasmtimeProcedureInstanceManager = ModuleInstanceManager>; -/// Wasm uses one instance manager for reducers/views and one for procedures. -/// -/// Both managers share the compiled module via `Arc` so either manager can -/// create replacement instances. Main-lane instance checkout happens after the -/// job is queued on the [`SingleCoreExecutor`], so concurrent websocket callers -/// enqueue first instead of racing to allocate multiple main instances. -struct WasmtimeModuleHost { - executor: SingleCoreExecutor, - main_instance: Arc, - procedure_instances: Arc, +struct WasmtimeModuleState { + instance: Box, + module: Arc, + metrics: InstanceManagerMetrics, } -impl WasmtimeModuleHost { - fn instance_manager(&self, kind: InstanceKind) -> Arc { - match kind { - InstanceKind::Main => self.main_instance.clone(), - InstanceKind::Procedure => self.procedure_instances.clone(), +impl WasmtimeModuleState { + fn new( + module: Arc, + init_inst: Box, + metrics: InstanceManagerMetrics, + ) -> Self { + metrics.track_initial_instance(); + Self { + instance: init_inst, + module, + metrics, } } - fn enqueue_job(&self, label: &str, on_panic: Arc, timer_guard: CallTimerGuard, f: F) - where - F: AsyncFnOnce() + Send + 'static, - { - let label = label.to_owned(); - self.executor.enqueue_job(async move || { - scopeguard::defer_on_unwind!({ - log::warn!("wasm job {label} panicked"); - on_panic(); - }); - - drop(timer_guard); - f().await; - }); + fn with_instance(&mut self, f: impl FnOnce(&mut ModuleInstance) -> R) -> R { + let res = f(self.instance.as_mut()); + if self.instance.needs_replacement() { + self.metrics.track_instance_removed(); + let start_time = Instant::now(); + *self.instance = self.module.as_ref().create_instance(); + self.metrics.observe_instance_created(start_time.elapsed()); + } + res } +} - fn enqueue_with_instance( +/// Wasm uses a single-core executor backed by a Tokio single threaded runtime +/// for async procedures. It uses an executor backed by a single OS-thread for +/// everything else. +/// +/// Note, procedures acquire a module instance from the async procedure pool +/// before being enqueued by the executor. +/// +/// Reducers are not executed concurrently, and so there is no pool from which +/// to acquire. +struct WasmtimeModuleHost { + module: Arc, + main_executor: SingleThreadedExecutor, + procedure_executor: SingleCoreExecutor, + procedure_instances: Arc, +} + +impl WasmtimeModuleHost { + fn enqueue_with_main_instance( &self, label: &str, on_panic: Arc, timer_guard: CallTimerGuard, - instance_kind: InstanceKind, arg: A, - wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + wasm: impl FnOnce(A, &mut ModuleInstance) + Send + 'static, ) where A: Send + 'static, { - let instance_manager = self.instance_manager(instance_kind); - self.enqueue_job(label, on_panic, timer_guard, async move || { - instance_manager - .with_instance(async move |mut inst| { - wasm(arg, &mut inst).await; - ((), inst) - }) - .await; + let label = label.to_owned(); + self.main_executor.enqueue_job(move |state| { + scopeguard::defer_on_unwind!({ + log::warn!("wasm main operation {label} panicked"); + on_panic(); + }); + + state.with_instance(move |inst| { + drop(timer_guard); + wasm(arg, inst); + }); }); } @@ -440,10 +458,10 @@ impl WasmtimeModuleHost { ) where A: Send + 'static, { - let instance_manager = self.instance_manager(InstanceKind::Procedure); + let instance_manager = self.procedure_instances.clone(); let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await; let label = label.to_owned(); - self.executor.enqueue_job(async move || { + self.procedure_executor.enqueue_job(async move || { scopeguard::defer_on_unwind!({ log::warn!("wasm procedure {label} panicked"); on_panic(); @@ -465,12 +483,6 @@ struct V8ModuleHost { procedure_instances: ModuleInstanceManager, } -#[derive(Clone, Copy)] -enum InstanceKind { - Main, - Procedure, -} - /// A module; used as a bound on `InstanceManager`. trait GenericModule { type Instance: GenericModuleInstance; @@ -512,6 +524,16 @@ impl GenericModule for Arc { } } +impl GenericModule for Arc { + type Instance = Box; + async fn create_instance(&self) -> Self::Instance { + Box::new((**self).create_instance()) + } + fn host_type(&self) -> HostType { + HostType::Wasm + } +} + impl GenericModule for super::v8::JsModule { type Instance = super::v8::JsProcedureInstance; async fn create_instance(&self) -> Self::Instance { @@ -1106,7 +1128,7 @@ impl CallProcedureParams { /// sandboxed instance and multiple procedures can run concurrently with up to /// one reducer. struct ModuleInstanceManager { - instances: Mutex>, + instances: AsyncMutex>, module: M, metrics: InstanceManagerMetrics, instance_slots: Option>, @@ -1251,10 +1273,6 @@ impl CreateInstanceTimeMetric { } impl ModuleInstanceManager { - fn new_with_metrics(module: M, init_inst: Option, metrics: InstanceManagerMetrics) -> Self { - Self::new_inner(module, init_inst, metrics, None) - } - fn new_bounded_with_metrics( module: M, init_inst: Option, @@ -1284,7 +1302,7 @@ impl ModuleInstanceManager { } Self { - instances: Mutex::new(instances), + instances: AsyncMutex::new(instances), module, metrics, instance_slots: max_instances.map(|max_instances| Arc::new(Semaphore::new(max_instances.get()))), @@ -1576,7 +1594,7 @@ macro_rules! call_instance { .call( $label, $arg, - async |$wasm_arg, $wasm_inst| $wasm, + |$wasm_arg, $wasm_inst| $wasm, async |$js_arg, $js_inst| $js, ) .await @@ -1612,27 +1630,37 @@ impl ModuleHost { let inner = match module { ModuleWithInstance::Wasm { module, - executor, + procedure_module, + main_thread_name, + procedure_thread_name, + core, init_inst, procedure_instance_pool_size, } => { info = module.info(); let module = Arc::new(module); + let procedure_module = Arc::new(procedure_module); let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); - let main_instance = Arc::new(ModuleInstanceManager::new_with_metrics( - module.clone(), - Some(init_inst), - metrics.clone(), - )); + let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone()); + let (load_balance_guard, core_pinner) = core.into_shared(); + let main_executor = AllocatedJobCore::spawn_executor( + load_balance_guard.clone(), + core_pinner.clone(), + main_state, + main_thread_name, + ); + let procedure_executor = + AllocatedJobCore::spawn_async_executor(load_balance_guard, core_pinner, procedure_thread_name); let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics( - module, + procedure_module, None, metrics, procedure_instance_pool_size, )); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { - executor, - main_instance, + module, + main_executor, + procedure_executor, procedure_instances, }))) } @@ -1719,13 +1747,12 @@ impl ModuleHost { /// Run a function for this module which has access to the module instance. /// - /// For WASM, the function is run on the module's JobThread. - /// For V8/JS, the function is run in the current task. + /// The function is run on the module's worker thread for both WASM and V8. async fn call( &self, label: &str, arg: A, - wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + wasm: impl FnOnce(A, &mut ModuleInstance) -> R + Send + 'static, js: impl AsyncFnOnce(A, &JsMainInstance) -> R, ) -> Result where @@ -1742,17 +1769,13 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let executor = host.executor.clone(); - let instance_manager = host.instance_manager(InstanceKind::Main); + let executor = host.main_executor.clone(); executor - .run_job(async move || { - drop(timer_guard); - instance_manager - .with_instance(async move |mut inst| { - let res = wasm(arg, &mut inst).await; - (res, inst) - }) - .await + .run_job(move |state| { + state.with_instance(move |inst| { + drop(timer_guard); + wasm(arg, inst) + }) }) .await } @@ -1790,8 +1813,8 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let executor = host.executor.clone(); - let instance_manager = host.instance_manager(InstanceKind::Procedure); + let executor = host.procedure_executor.clone(); + let instance_manager = host.procedure_instances.clone(); instance_manager .with_instance(async move |mut inst| { executor @@ -1860,23 +1883,21 @@ impl ModuleHost { ) -> Result, DBError> { let metric = cmd.metric(); - let info = self.info.clone(); self.enqueue_main_operation( "websocket view operation", label, (cmd, metric), |(cmd, metric), inst, on_panic| async move { inst.enqueue_call_view(cmd, metric, on_panic).await }, move |(cmd, metric), wasm_host, on_panic, timer_guard| { - let info = info.clone(); - wasm_host.enqueue_with_instance( + let info = wasm_host.module.info(); + wasm_host.enqueue_with_main_instance( label, on_panic, timer_guard, - InstanceKind::Main, - cmd, - async move |cmd, inst| { + (cmd, metric), + move |(cmd, metric), inst| { let result = inst.call_view(cmd); - Self::record_view_command_round_trip(&info, metric); + ModuleHost::record_view_command_round_trip(&info, metric); if let Err(err) = result { log::warn!("websocket view operation failed: {err:#}"); } @@ -2297,13 +2318,12 @@ impl ModuleHost { call.params, |params, inst, on_panic| async move { inst.enqueue_reducer(params, on_panic).await }, move |params, wasm_host, on_panic, timer_guard| { - wasm_host.enqueue_with_instance( + wasm_host.enqueue_with_main_instance( &reducer_label, on_panic, timer_guard, - InstanceKind::Main, params, - async |params, inst| { + move |params, inst| { let _ = inst.call_reducer(params); }, ); @@ -2729,7 +2749,7 @@ impl ModuleHost { self, "scheduled reducer", params, - |params, inst| inst.call_scheduled_function(params).await, + |params, inst| inst.call_scheduled_reducer(params), |params, inst| inst.call_scheduled_reducer(params).await, ) .map_err(Into::into) @@ -2743,7 +2763,7 @@ impl ModuleHost { self, "scheduled procedure", params, - |params, inst| inst.call_scheduled_function(params).await, + |params, inst| inst.call_scheduled_procedure(params).await, |params, inst| inst.call_scheduled_procedure(params).await, ) .map_err(Into::into) @@ -3062,21 +3082,28 @@ impl ModuleHost { async fn one_off_query_with_params(&self, request: OneOffQueryRequest) -> Result<(), anyhow::Error> { let label = request.label(); - let timer = request.timer(); - let info = self.info.clone(); self.enqueue_main_operation( "websocket one-off query operation", label, request, |request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await }, move |request, wasm_host, on_panic, timer_guard| { - let info = info.clone(); - wasm_host.enqueue_job(label, on_panic, timer_guard, async move || { - let result = request.run(); - Self::record_one_off_query_round_trip(&info, timer); - if let Err(err) = result { - log::warn!("One-off query failed: {err:#}"); + let executor = wasm_host.main_executor.clone(); + let info = wasm_host.module.info(); + let label = label.to_owned(); + executor.enqueue_job(move |_| { + scopeguard::defer_on_unwind!({ + log::warn!("websocket one-off query operation {label} panicked"); + on_panic(); + }); + + drop(timer_guard); + let timer = request.timer(); + let res = request.run(); + if let Err(err) = &res { + log::warn!("detached one-off query failed: {err:#}"); } + ModuleHost::record_one_off_query_round_trip(&info, timer); }); Ok(()) }, @@ -3379,14 +3406,14 @@ impl ModuleHost { pub(crate) fn replica_ctx(&self) -> &ReplicaContext { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.replica_ctx(), + ModuleHostInner::Wasm(wasm) => wasm.module.replica_ctx(), ModuleHostInner::Js(js) => js.module.replica_ctx(), } } fn scheduler(&self) -> &Scheduler { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.scheduler(), + ModuleHostInner::Wasm(wasm) => wasm.module.scheduler(), ModuleHostInner::Js(js) => js.module.scheduler(), } } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 0e68e7c792e..f001a19ab53 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -17,7 +17,7 @@ use spacetimedb_datastore::system_tables::{StScheduledFields, ST_SCHEDULED_ID}; use spacetimedb_datastore::traits::IsolationLevel; use spacetimedb_lib::scheduler::ScheduleAt; use spacetimedb_lib::Timestamp; -use spacetimedb_primitives::{ColId, FunctionId, TableId}; +use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::bsatn::ToBsatn as _; use spacetimedb_sats::AlgebraicValue; use spacetimedb_table::table::RowRef; @@ -292,6 +292,11 @@ enum QueueItem { #[derive(Clone)] pub(crate) struct ScheduledFunctionParams(QueueItem); +enum ScheduledFunctionKind { + Reducer, + Procedure, +} + impl ScheduledFunctionParams { fn function_name(&self) -> &str { match &self.0 { @@ -300,8 +305,12 @@ impl ScheduledFunctionParams { } } - pub(crate) fn is_procedure(&self, module: &ModuleInfo) -> bool { - module.module_def.procedure_full(self.function_name()).is_some() + fn kind(&self, module: &ModuleInfo) -> ScheduledFunctionKind { + if module.module_def.procedure_full(self.function_name()).is_some() { + ScheduledFunctionKind::Procedure + } else { + ScheduledFunctionKind::Reducer + } } } @@ -377,10 +386,9 @@ impl SchedulerActor { }; let params = ScheduledFunctionParams(item.clone()); - let result = if params.is_procedure(module_host.info()) { - module_host.call_scheduled_procedure(params).await - } else { - module_host.call_scheduled_reducer(params).await + let result = match params.kind(module_host.info()) { + ScheduledFunctionKind::Procedure => module_host.call_scheduled_procedure(params).await, + ScheduledFunctionKind::Reducer => module_host.call_scheduled_reducer(params).await, }; match result { @@ -421,109 +429,28 @@ struct Reschedule { at_real: Instant, } -pub(super) async fn call_scheduled_function( +enum ScheduledProcedureStep { + Done(CallScheduledFunctionResult, bool), + Procedure { + params: CallProcedureParams, + reschedule: Option, + }, +} + +pub(super) async fn call_scheduled_procedure( module_info: &ModuleInfo, params: ScheduledFunctionParams, inst_common: &mut InstanceCommon, inst: &mut impl WasmInstance, ) -> (CallScheduledFunctionResult, bool) { - let ScheduledFunctionParams(item) = params; - - let id = match &item { - QueueItem::Id { id, .. } => Some(*id), - QueueItem::VolatileNonatomicImmediate { .. } => None, - }; - let db = &**module_info.relational_db(); - - enum Function { - Reducer(CallScheduledFunctionResult, bool), - Procedure { - params: CallProcedureParams, - reschedule: Option, - }, - } - - let next_step = { - let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - - // Determine the call params. - // This also lets us know whether to call a reducer or procedure. - let params = call_params_for_queued_item(module_info, db, &tx, item); - let (timestamp, instant, params) = match params { - // If the function was already deleted, leave the `ScheduledFunction` - // in the database for when the module restarts. - Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false), - Ok(Some(params)) => params, - Err(err) => { - // All we can do here is log an error. - log::error!("could not determine scheduled function or its parameters: {err:#}"); - let reschedule = delete_scheduled_function_row(module_info, db, id, Some(tx), None, inst_common, inst); - return (CallScheduledFunctionResult { reschedule }, false); - } - }; - - // We've determined whether we have a reducer or procedure. - // The logic between them will now split, - // as for scheduled procedures, it's incorrect to retry them if execution aborts midway, - // so we must remove the schedule row before executing. - match params { - CallParams::Reducer(params) => { - // Patch the transaction context with ReducerContext so the commitlog - // records the reducer's name, caller, timestamp, and arguments. - // - // Background: Scheduled reducers start with Workload::Internal, but - // call_reducer_with_tx only sets ReducerContext when tx is None. - // Since we pass Some(tx), we must set it here. - let reducer_name = &module_info.module_def.reducer_by_id(params.reducer_id).name; - tx.ctx = ExecutionContext::with_workload( - tx.ctx.database_identity(), - Workload::Reducer(ReducerContext { - name: reducer_name.clone(), - caller_identity: params.caller_identity, - caller_connection_id: params.caller_connection_id, - timestamp: params.timestamp, - arg_bsatn: params.args.get_bsatn().clone(), - }), - ); - - // We don't want a panic in the module host to affect the scheduler, as unlikely - // as it might be, so catch it so we can handle it "gracefully". Panics will - // print their message and backtrace when they occur, so we don't need to do - // anything with the error payload. - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - inst_common.call_reducer_with_tx(Some(tx), params, inst) - })); - let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst); - // Currently, we drop the return value from the function call. In the future, - // we might want to handle it somehow. - let trapped = match result { - Ok((_res, trapped)) => trapped, - Err(_err) => true, - }; - Function::Reducer(CallScheduledFunctionResult { reschedule }, trapped) - } - CallParams::Procedure(params) => { - // Delete scheduled row. - let reschedule = delete_scheduled_function_row( - module_info, - db, - id, - Some(tx), - Some((timestamp, instant)), - inst_common, - inst, - ); - Function::Procedure { params, reschedule } - } - } - }; + let next_step = prepare_scheduled_procedure_call(module_info, params, inst_common, inst); // Below code is outside of the DB transaction scope because the // compiler complains about holding mutable borrow across await point while calling a procedure, // even though it has been already moved during `delete_scheduled_function_row` call. match next_step { - Function::Reducer(result, trapped) => (result, trapped), - Function::Procedure { params, reschedule } => { + ScheduledProcedureStep::Done(result, trapped) => (result, trapped), + ScheduledProcedureStep::Procedure { params, reschedule } => { // Execute the procedure. See above for commentary on `catch_unwind()`. let result = panic::AssertUnwindSafe(inst_common.call_procedure(params, inst)) .catch_unwind() @@ -541,6 +468,139 @@ pub(super) async fn call_scheduled_function( } } +pub(super) fn call_scheduled_reducer( + module_info: &ModuleInfo, + params: ScheduledFunctionParams, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, +) -> (CallScheduledFunctionResult, bool) { + call_scheduled_reducer_until_done(module_info, params, inst_common, inst) +} + +/// Prepares a scheduled procedure by resolving its arguments and deleting/rescheduling +/// the schedule row before execution. +/// +/// The actual procedure call is async, so this helper returns a procedure step for the +/// caller to await after the transaction scope has ended. +fn prepare_scheduled_procedure_call( + module_info: &ModuleInfo, + params: ScheduledFunctionParams, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, +) -> ScheduledProcedureStep { + let ScheduledFunctionParams(item) = params; + let id = scheduled_item_id(&item); + let db = &**module_info.relational_db(); + let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + + let params = procedure_call_params_for_queued_item(module_info, db, &tx, item); + let (timestamp, instant, params) = match params { + // If the function was already deleted, leave the `ScheduledFunction` + // in the database for when the module restarts. + Ok(None) => return ScheduledProcedureStep::Done(CallScheduledFunctionResult { reschedule: None }, false), + Ok(Some(params)) => params, + Err(err) => { + // All we can do here is log an error. + log::error!("could not determine scheduled procedure or its parameters: {err:#}"); + let reschedule = delete_scheduled_function_row(module_info, db, id, Some(tx), None, inst_common, inst); + return ScheduledProcedureStep::Done(CallScheduledFunctionResult { reschedule }, false); + } + }; + + // For scheduled procedures, it's incorrect to retry them if execution aborts midway, + // so we must remove the schedule row before executing. + let reschedule = delete_scheduled_function_row( + module_info, + db, + id, + Some(tx), + Some((timestamp, instant)), + inst_common, + inst, + ); + ScheduledProcedureStep::Procedure { params, reschedule } +} + +fn call_scheduled_reducer_until_done( + module_info: &ModuleInfo, + params: ScheduledFunctionParams, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, +) -> (CallScheduledFunctionResult, bool) { + let ScheduledFunctionParams(item) = params; + let id = scheduled_item_id(&item); + let db = &**module_info.relational_db(); + let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + + let params = reducer_call_params_for_queued_item(module_info, db, &tx, item); + let params = match params { + // If the function was already deleted, leave the `ScheduledFunction` + // in the database for when the module restarts. + Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false), + Ok(Some((_timestamp, _instant, params))) => params, + Err(err) => { + // All we can do here is log an error. + log::error!("could not determine scheduled reducer or its parameters: {err:#}"); + let reschedule = delete_scheduled_function_row(module_info, db, id, Some(tx), None, inst_common, inst); + return (CallScheduledFunctionResult { reschedule }, false); + } + }; + + call_scheduled_reducer_with_tx(module_info, db, id, tx, params, inst_common, inst) +} + +fn scheduled_item_id(item: &QueueItem) -> Option { + match item { + QueueItem::Id { id, .. } => Some(*id), + QueueItem::VolatileNonatomicImmediate { .. } => None, + } +} + +fn call_scheduled_reducer_with_tx( + module_info: &ModuleInfo, + db: &RelationalDB, + id: Option, + mut tx: MutTxId, + params: CallReducerParams, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, +) -> (CallScheduledFunctionResult, bool) { + // Patch the transaction context with ReducerContext so the commitlog + // records the reducer's name, caller, timestamp, and arguments. + // + // Background: Scheduled reducers start with Workload::Internal, but + // call_reducer_with_tx only sets ReducerContext when tx is None. + // Since we pass Some(tx), we must set it here. + let reducer_name = &module_info.module_def.reducer_by_id(params.reducer_id).name; + tx.ctx = ExecutionContext::with_workload( + tx.ctx.database_identity(), + Workload::Reducer(ReducerContext { + name: reducer_name.clone(), + caller_identity: params.caller_identity, + caller_connection_id: params.caller_connection_id, + timestamp: params.timestamp, + arg_bsatn: params.args.get_bsatn().clone(), + }), + ); + + // We don't want a panic in the module host to affect the scheduler, as unlikely + // as it might be, so catch it so we can handle it "gracefully". Panics will + // print their message and backtrace when they occur, so we don't need to do + // anything with the error payload. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + inst_common.call_reducer_with_tx(Some(tx), params, inst) + })); + let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst); + // Currently, we drop the return value from the function call. In the future, + // we might want to handle it somehow. + let trapped = match result { + Ok((_res, trapped)) => trapped, + Err(_err) => true, + }; + + (CallScheduledFunctionResult { reschedule }, trapped) +} + /// Deletes a scheduled-row entry after its function runs, reusing `tx` when one is already /// open and otherwise creating an internal transaction for the cleanup. /// @@ -647,12 +707,36 @@ fn refresh_views_then_commit_and_broadcast( } } -fn call_params_for_queued_item( +fn reducer_call_params_for_queued_item( + module: &ModuleInfo, + db: &RelationalDB, + tx: &MutTxId, + item: QueueItem, +) -> anyhow::Result> { + call_params_for_queued_item(module, db, tx, item, function_to_reducer_call_params) +} + +fn procedure_call_params_for_queued_item( module: &ModuleInfo, db: &RelationalDB, tx: &MutTxId, item: QueueItem, -) -> anyhow::Result> { +) -> anyhow::Result> { + call_params_for_queued_item(module, db, tx, item, function_to_procedure_call_params) +} + +fn call_params_for_queued_item( + module: &ModuleInfo, + db: &RelationalDB, + tx: &MutTxId, + item: QueueItem, + function_to_call_params: impl FnOnce( + &ModuleInfo, + &str, + FunctionArgs, + Option, + ) -> anyhow::Result<(Timestamp, Instant, T)>, +) -> anyhow::Result> { Ok(Some(match item { QueueItem::Id { id, function_name, at } => { let Some(schedule_row) = get_schedule_row_mut(tx, db, id)? else { @@ -668,49 +752,51 @@ fn call_params_for_queued_item( })) } -enum CallParams { - Reducer(CallReducerParams), - Procedure(CallProcedureParams), +fn function_to_reducer_call_params( + module: &ModuleInfo, + name: &str, + args: FunctionArgs, + at: Option, +) -> anyhow::Result<(Timestamp, Instant, CallReducerParams)> { + let identity = module.database_identity; + + let module = &module.module_def; + let Some((id, def)) = module.reducer_full(name) else { + return Err(anyhow!("Reducer `{name}` not found")); + }; + let args = args.into_tuple_for_def(module, def).map_err(InvalidReducerArguments)?; + + let (ts, instant) = scheduled_call_time(at); + Ok((ts, instant, CallReducerParams::from_system(ts, identity, id, args))) } -/// Finds the function for `name` -/// and returns the appropriate call parameters -/// to call the function with `args`. -fn function_to_call_params( +fn function_to_procedure_call_params( module: &ModuleInfo, name: &str, args: FunctionArgs, at: Option, -) -> anyhow::Result<(Timestamp, Instant, CallParams)> { +) -> anyhow::Result<(Timestamp, Instant, CallProcedureParams)> { let identity = module.database_identity; - // Find the function and deserialize the arguments. let module = &module.module_def; - let (id, args) = if let Some((id, def)) = module.reducer_full(name) { - let args = args.into_tuple_for_def(module, def).map_err(InvalidReducerArguments)?; - (FunctionId::Reducer(id), args) - } else if let Some((id, def)) = module.procedure_full(name) { - let args = args - .into_tuple_for_def(module, def) - .map_err(InvalidProcedureArguments)?; - (FunctionId::Procedure(id), args) - } else { - // This should be impossible, but let's still return an error to log. - return Err(anyhow!("Reducer or procedure `{name}` not found")); + let Some((id, def)) = module.procedure_full(name) else { + return Err(anyhow!("Procedure `{name}` not found")); }; + let args = args + .into_tuple_for_def(module, def) + .map_err(InvalidProcedureArguments)?; + + let (ts, instant) = scheduled_call_time(at); + Ok((ts, instant, CallProcedureParams::from_system(ts, identity, id, args))) +} +fn scheduled_call_time(at: Option) -> (Timestamp, Instant) { // The timestamp we tell the function it's running at will be // at least the timestamp it was scheduled to run at. let now = Timestamp::now(); let ts = at.unwrap_or(now).max(now); let instant = Instant::now() + ts.duration_since(now).unwrap_or(Duration::ZERO); - - let params = match id { - FunctionId::Reducer(id) => CallParams::Reducer(CallReducerParams::from_system(ts, identity, id, args)), - FunctionId::Procedure(id) => CallParams::Procedure(CallProcedureParams::from_system(ts, identity, id, args)), - }; - - Ok((ts, instant, params)) + (ts, instant) } /// Returns the `schedule_row` for `id`. diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index db5b64bf5fc..413c34b5340 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1362,10 +1362,7 @@ fn handle_main_worker_request( } JsMainWorkerRequest::ScheduledReducer { reply_tx, params } => { handle_worker_request("scheduled_reducer", reply_tx, || { - let (res, trapped) = instance_common - .call_scheduled_function(params, inst) - .now_or_never() - .expect("our call_scheduled_function implementation is not actually async"); + let (res, trapped) = instance_common.call_scheduled_reducer(params, inst); (res, trapped) }) } @@ -1463,9 +1460,9 @@ fn handle_procedure_worker_request( JsProcedureWorkerRequest::ScheduledProcedure { reply_tx, params } => { handle_worker_request("scheduled_procedure", reply_tx, || { let (res, trapped) = instance_common - .call_scheduled_function(params, inst) + .call_scheduled_procedure(params, inst) .now_or_never() - .expect("our call_scheduled_function implementation is not actually async"); + .expect("our call_scheduled_procedure implementation is not actually async"); (res, trapped) }) } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index b269bbb92e4..8751a26804d 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -394,6 +394,15 @@ impl WasmModuleHostActor { Ok((module, initial_instance)) } + + pub fn with_runtime_module(&self, module: U) -> Result, InitializationError> { + let module = module.instantiate_pre()?; + Ok(WasmModuleHostActor { + module, + common: self.common.clone(), + func_names: self.func_names.clone(), + }) + } } impl WasmModuleHostActor { @@ -528,11 +537,20 @@ impl WasmModuleInstance { res } - pub(in crate::host) async fn call_scheduled_function( + pub(in crate::host) async fn call_scheduled_procedure( &mut self, params: ScheduledFunctionParams, ) -> CallScheduledFunctionResult { - let (res, trapped) = self.common.call_scheduled_function(params, &mut self.instance).await; + let (res, trapped) = self.common.call_scheduled_procedure(params, &mut self.instance).await; + self.trapped = trapped; + res + } + + pub(in crate::host) fn call_scheduled_reducer( + &mut self, + params: ScheduledFunctionParams, + ) -> CallScheduledFunctionResult { + let (res, trapped) = self.common.call_scheduled_reducer(params, &mut self.instance); self.trapped = trapped; res } @@ -1363,12 +1381,20 @@ impl InstanceCommon { self.info.relational_db().clear_all_clients().map_err(Into::into) } - pub(crate) async fn call_scheduled_function( + pub(crate) async fn call_scheduled_procedure( + &mut self, + params: ScheduledFunctionParams, + inst: &mut I, + ) -> (CallScheduledFunctionResult, bool) { + crate::host::scheduler::call_scheduled_procedure(&self.info.clone(), params, self, inst).await + } + + pub(crate) fn call_scheduled_reducer( &mut self, params: ScheduledFunctionParams, inst: &mut I, ) -> (CallScheduledFunctionResult, bool) { - crate::host::scheduler::call_scheduled_function(&self.info.clone(), params, self, inst).await + crate::host::scheduler::call_scheduled_reducer(&self.info.clone(), params, self, inst) } } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index e9396fc6d7e..fd227010944 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -11,7 +11,7 @@ use spacetimedb_paths::server::ServerDataDir; use std::borrow::Cow; use std::time::Duration; use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut}; -pub use wasmtime_module::{WasmtimeInstance, WasmtimeModule}; +pub use wasmtime_module::{WasmtimeAsyncModule, WasmtimeInstance, WasmtimeModule}; #[cfg(unix)] mod pooling_stack_creator; @@ -19,8 +19,10 @@ mod wasm_instance_env; mod wasmtime_module; pub struct WasmtimeRuntime { - engine: Engine, - linker: Box>, + sync_engine: Engine, + sync_linker: Box>, + async_engine: Engine, + async_linker: Box>, config: WasmConfig, } @@ -46,76 +48,106 @@ pub(crate) fn epoch_ticker(mut on_tick: impl 'static + Send + FnMut() -> Option< impl WasmtimeRuntime { pub fn new(data_dir: Option<&ServerDataDir>, runtime_config: WasmConfig) -> Self { - let mut config = wasmtime::Config::new(); - config - .cranelift_opt_level(wasmtime::OptLevel::Speed) - .consume_fuel(true) - .epoch_interruption(true) - .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable) - // We need async support to enable suspending execution of procedures - // when waiting for e.g. HTTP responses or the transaction lock. - // We don't enable either fuel-based or epoch-based yielding - // (see https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.epoch_deadline_async_yield_and_update - // and https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.fuel_async_yield_interval) - // so reducers will always execute to completion during the first `Future::poll` call, - // and procedures will only yield when performing an asynchronous operation. - // These futures are executed on a separate single-threaded executor not related to the "global" Tokio runtime, - // which is responsible only for executing WASM. See `crate::util::jobs` for this infrastructure. - .async_support(true); + let sync_config = wasmtime_config(data_dir, false); + let async_config = wasmtime_config(data_dir, true); - #[cfg(unix)] - config - .async_stack_size(self::pooling_stack_creator::ASYNC_STACK_SIZE) - .with_host_stack(self::pooling_stack_creator::PoolingStackCreator::new()); + let sync_engine = Engine::new(&sync_config).unwrap(); + let async_engine = Engine::new(&async_config).unwrap(); - // Offer a compile-time flag for enabling perfmap generation, - // so `perf` can display JITted symbol names. - // Ideally we would be able to configure this at runtime via a flag to `spacetime start`, - // but this is good enough for now. - #[cfg(feature = "perfmap")] - config.profiler(wasmtime::ProfilingStrategy::PerfMap); - - if let Some(data_dir) = data_dir { - let mut cache_config = wasmtime::CacheConfig::new(); - cache_config.with_directory(data_dir.wasmtime_cache().0); - match wasmtime::Cache::new(cache_config) { - Ok(cache) => { - config.cache(Some(cache)); - } - Err(e) => { - // caching is just an optimization, so if it fails, just log and continue - tracing::warn!("failed to set up wasmtime cache: {e:#}") - } + let weak_sync_engine = sync_engine.weak(); + let weak_async_engine = async_engine.weak(); + epoch_ticker(move || { + let mut ticked = false; + if let Some(engine) = weak_sync_engine.upgrade() { + engine.increment_epoch(); + ticked = true; } + if let Some(engine) = weak_async_engine.upgrade() { + engine.increment_epoch(); + ticked = true; + } + ticked.then_some(()) + }); + + let mut sync_linker = Box::new(Linker::new(&sync_engine)); + WasmtimeModule::link_imports(&mut sync_linker).unwrap(); + + let mut async_linker = Box::new(Linker::new(&async_engine)); + WasmtimeAsyncModule::link_imports(&mut async_linker).unwrap(); + + let config = runtime_config; + WasmtimeRuntime { + sync_engine, + sync_linker, + async_engine, + async_linker, + config, } + } +} - let engine = Engine::new(&config).unwrap(); +fn wasmtime_config(data_dir: Option<&ServerDataDir>, async_support: bool) -> wasmtime::Config { + let mut config = wasmtime::Config::new(); + config + .cranelift_opt_level(wasmtime::OptLevel::Speed) + .consume_fuel(true) + .epoch_interruption(true) + .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); - let weak_engine = engine.weak(); - epoch_ticker(move || { - let engine = weak_engine.upgrade()?; - engine.increment_epoch(); - Some(()) - }); + if async_support { + // Procedure instances need async support to suspend execution when waiting for + // e.g. HTTP responses or the transaction lock. Main-lane instances use a + // separate sync engine so reducers/views do not pay Wasmtime's fiber overhead. + config.async_support(true); - let mut linker = Box::new(Linker::new(&engine)); - WasmtimeModule::link_imports(&mut linker).unwrap(); + #[cfg(unix)] + config + .async_stack_size(self::pooling_stack_creator::ASYNC_STACK_SIZE) + .with_host_stack(self::pooling_stack_creator::PoolingStackCreator::new()); + } - let config = runtime_config; - WasmtimeRuntime { engine, linker, config } + // Offer a compile-time flag for enabling perfmap generation, + // so `perf` can display JITted symbol names. + // Ideally we would be able to configure this at runtime via a flag to `spacetime start`, + // but this is good enough for now. + #[cfg(feature = "perfmap")] + config.profiler(wasmtime::ProfilingStrategy::PerfMap); + + if let Some(data_dir) = data_dir { + let mut cache_config = wasmtime::CacheConfig::new(); + cache_config.with_directory(data_dir.wasmtime_cache().0); + match wasmtime::Cache::new(cache_config) { + Ok(cache) => { + config.cache(Some(cache)); + } + Err(e) => { + // caching is just an optimization, so if it fails, just log and continue + tracing::warn!("failed to set up wasmtime cache: {e:#}") + } + } } + + config } pub type Module = WasmModuleHostActor; +pub type ProcedureModule = WasmModuleHostActor; pub type ModuleInstance = WasmModuleInstance; const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 8; -fn wasm_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { +fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { + let hex = database_identity.to_hex(); + // We use the tail of the identity to avoid the common structured prefix. + let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..]; + format!("wasm-main-{suffix}") +} + +fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String { let hex = database_identity.to_hex(); // We use the tail of the identity to avoid the common structured prefix. let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..]; - format!("wasm-{suffix}") + format!("wasm-proc-{suffix}") } impl WasmtimeRuntime { @@ -126,7 +158,7 @@ impl WasmtimeRuntime { core: AllocatedJobCore, ) -> anyhow::Result { let module = - wasmtime::Module::new(&self.engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?; + wasmtime::Module::new(&self.sync_engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?; let func_imports = module .imports() @@ -136,17 +168,29 @@ impl WasmtimeRuntime { abi::verify_supported(WasmtimeModule::IMPLEMENTED_ABI, abi)?; let module = self - .linker + .sync_linker .instantiate_pre(&module) .map_err(InitializationError::Instantiation)?; + let procedure_module = + wasmtime::Module::new(&self.async_engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?; + let procedure_module = self + .async_linker + .instantiate_pre(&procedure_module) + .map_err(InitializationError::Instantiation)?; let module = WasmtimeModule::new(module); - let executor_thread_name = wasm_executor_thread_name(&mcc.replica_ctx.database_identity); + let procedure_module = WasmtimeAsyncModule::new(procedure_module); + let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity); + let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity); let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?; + let procedure_module = module.with_runtime_module(procedure_module)?; Ok(super::module_host::ModuleWithInstance::Wasm { module, - executor: core.spawn_named_async_executor(executor_thread_name), + procedure_module, + main_thread_name, + procedure_thread_name, + core, init_inst: Box::new(init_inst), procedure_instance_pool_size: self.config.procedure_instance_pool_size, }) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 7c0b2a088b2..63eb862cddb 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1785,6 +1785,7 @@ impl WasmInstanceEnv { view_call.sender, args_source.0, result_sink, + true, )?; Ok(code) diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 0690120eb16..2f94d2d9116 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -22,8 +22,8 @@ use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use wasmtime::{ - AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, - WasmResults, + AsContext, AsContextMut, Caller, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, + WasmParams, WasmResults, }; fn log_traceback(func_type: &str, func: &str, e: &wasmtime::Error) { @@ -45,6 +45,11 @@ pub struct WasmtimeModule { module: InstancePre, } +#[derive(Clone)] +pub struct WasmtimeAsyncModule { + module: InstancePre, +} + impl WasmtimeModule { pub(super) fn new(module: InstancePre) -> Self { WasmtimeModule { module } @@ -53,50 +58,121 @@ impl WasmtimeModule { pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 5); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { - const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; - macro_rules! link_functions { - ($($module:literal :: $func:ident,)*) => { - #[allow(deprecated)] - linker$(.func_wrap($module, stringify!($func), WasmInstanceEnv::$func)?)*; - } - } - macro_rules! link_async_functions { - ($($module:literal :: $func:ident,)*) => { - #[allow(deprecated)] - linker$(.func_wrap_async($module, stringify!($func), WasmInstanceEnv::$func)?)*; - } - } - abi_funcs!(link_functions, link_async_functions); - Ok(()) + link_imports(linker, AsyncImportMode::SyncStub) } } -impl module_host_actor::WasmModule for WasmtimeModule { - type Instance = WasmtimeInstance; - type InstancePre = Self; - - type ExternType = ExternType; +impl WasmtimeAsyncModule { + pub(super) fn new(module: InstancePre) -> Self { + WasmtimeAsyncModule { module } + } - fn get_export(&self, s: &str) -> Option { - self.module - .module() - .exports() - .find(|exp| exp.name() == s) - .map(|exp| exp.ty()) + pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { + link_imports(linker, AsyncImportMode::Async) } +} - fn for_each_export(&self, mut f: impl FnMut(&str, &Self::ExternType) -> Result<(), E>) -> Result<(), E> { - self.module - .module() - .exports() - .try_for_each(|exp| f(exp.name(), &exp.ty())) +#[derive(Copy, Clone)] +enum AsyncImportMode { + SyncStub, + Async, +} + +fn link_imports(linker: &mut Linker, async_import_mode: AsyncImportMode) -> anyhow::Result<()> { + const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; + // `abi_funcs!` separates the ABI into normal sync imports and async-only imports. + // Sync imports are linked the same way for both Wasmtime modules. Async-only imports + // are linked as real `func_wrap_async` imports for procedure instances, but as sync + // stubs for main-lane instances so accidental use fails with a clear module error. + macro_rules! link_functions { + ($($module:literal :: $func:ident,)*) => { + #[allow(deprecated)] + linker$(.func_wrap($module, stringify!($func), WasmInstanceEnv::$func)?)*; + } + } + macro_rules! link_async_functions { + ($($module:literal :: $func:ident,)*) => { + $( + match async_import_mode { + AsyncImportMode::SyncStub => { + link_sync_stub_for_async_function(linker, $module, stringify!($func))?; + } + AsyncImportMode::Async => { + #[allow(deprecated)] + linker.func_wrap_async($module, stringify!($func), WasmInstanceEnv::$func)?; + } + } + )* + } } + abi_funcs!(link_functions, link_async_functions); + Ok(()) +} - fn instantiate_pre(&self) -> Result { - Ok(self.clone()) +fn link_sync_stub_for_async_function( + linker: &mut Linker, + module: &'static str, + name: &'static str, +) -> anyhow::Result<()> { + match name { + "procedure_sleep_until" => { + linker.func_wrap(module, name, procedure_sleep_until_sync_stub)?; + } + "procedure_http_request" => { + linker.func_wrap(module, name, procedure_http_request_sync_stub)?; + } + _ => anyhow::bail!("missing sync Wasmtime stub for async ABI import `{module}::{name}`"), } + Ok(()) +} + +fn procedure_sleep_until_sync_stub(_: Caller<'_, WasmInstanceEnv>, _: i64) -> anyhow::Result { + anyhow::bail!("procedure_sleep_until is only available in async instances") +} + +fn procedure_http_request_sync_stub( + _: Caller<'_, WasmInstanceEnv>, + _: u32, + _: u32, + _: u32, + _: u32, + _: u32, +) -> anyhow::Result { + anyhow::bail!("procedure_http_request is only available in async instances") +} + +macro_rules! impl_wasmtime_module { + ($module:ty) => { + impl module_host_actor::WasmModule for $module { + type Instance = WasmtimeInstance; + type InstancePre = Self; + type ExternType = ExternType; + + fn get_export(&self, s: &str) -> Option { + self.module + .module() + .exports() + .find(|exp| exp.name() == s) + .map(|exp| exp.ty()) + } + + fn for_each_export(&self, mut f: impl FnMut(&str, &Self::ExternType) -> Result<(), E>) -> Result<(), E> { + self.module + .module() + .exports() + .try_for_each(|exp| f(exp.name(), &exp.ty())) + } + + fn instantiate_pre(&self) -> Result { + Ok(self.clone()) + } + } + }; } +impl_wasmtime_module!(WasmtimeModule); +impl_wasmtime_module!(WasmtimeAsyncModule); + fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), ExecutionError> { handle_result_sink_code(code, error).map(drop) } @@ -140,23 +216,27 @@ const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; /// Invoke `typed_func` and assert that it doesn't yield. /// -/// Our Wasmtime is configured for `async` execution, and will panic if we use the non-async [`TypedFunc::call`]. -/// The `async` config is necessary to allow procedures to suspend, e.g. when making HTTP calls or acquiring transactions. -/// However, most of the WASM we execute, incl. reducers and startup functions, should never block/yield. -/// Rather than crossing our fingers and trusting, we run [`TypedFunc::call_async`] in [`FutureExt::now_or_never`], -/// an "async executor" which invokes [`std::task::Future::poll`] exactly once. +/// Main-lane instances are created with sync Wasmtime support and use [`TypedFunc::call`]. +/// Procedure instances are created with async Wasmtime support, but some procedure-side helper +/// calls still must complete without yielding while holding internal transaction state. Those use +/// [`TypedFunc::call_async`] with [`FutureExt::now_or_never`] to enforce immediate completion. pub(super) fn call_sync_typed_func( typed_func: &TypedFunc, mut ctx: impl AsContextMut, args: Args, + supports_async: bool, ) -> anyhow::Result where Args: WasmParams + Sync, Ret: WasmResults + Sync, { - let fut = typed_func.call_async(ctx.as_context_mut(), args); - fut.now_or_never() - .expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") + if supports_async { + let fut = typed_func.call_async(ctx.as_context_mut(), args); + fut.now_or_never() + .expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") + } else { + typed_func.call(ctx.as_context_mut(), args) + } } #[allow(clippy::too_many_arguments)] @@ -169,6 +249,7 @@ pub(super) fn call_view_export( sender: Option, args_source: u32, result_sink: u32, + supports_async: bool, ) -> anyhow::Result { if let Some(sender) = sender { let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(sender); @@ -184,6 +265,7 @@ pub(super) fn call_view_export( &call_view, ctx.as_context_mut(), (fn_ptr, sender_0, sender_1, sender_2, sender_3, args_source, result_sink), + supports_async, ) } else { let call_view_anon = call_view_anon.ok_or_else(|| { @@ -198,6 +280,7 @@ pub(super) fn call_view_export( &call_view_anon, ctx.as_context_mut(), (fn_ptr, args_source, result_sink), + supports_async, ) } } @@ -206,79 +289,132 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { type Instance = WasmtimeInstance; fn instantiate(&self, env: InstanceEnv, func_names: &FuncNames) -> Result { - let env = WasmInstanceEnv::new(env); - let mut store = Store::new(self.module.module().engine(), env); - let instance_fut = self.module.instantiate_async(&mut store); - - let instance = instance_fut - .now_or_never() - .expect("`instantiate_async` did not immediately return `Ready`") - .map_err(InitializationError::Instantiation)?; - - let mem = Mem::extract(&instance, &mut store).unwrap(); - store.data_mut().instantiate(mem); - - store.epoch_deadline_callback(|store| { - let env = store.data(); - let database = env.instance_env().replica_ctx.database_identity; - let funcall = env.log_record_function().unwrap_or_default(); - let dur = env.funcall_start().elapsed(); - // TODO(procedure-timing): This measurement is not super meaningful for procedures, - // which may (will) suspend execution and therefore may not have been continuously running since `env.funcall_start`. - tracing::warn!(funcall, ?database, "Wasm has been running for {dur:?}"); - Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND)) - }); + instantiate_wasmtime_instance( + //hello + &self.module, + env, + func_names, + false, + // Instantiate an async-disabled wasmtime instance + instantiate_sync, + ) + } +} + +impl module_host_actor::WasmInstancePre for WasmtimeAsyncModule { + type Instance = WasmtimeInstance; + + fn instantiate(&self, env: InstanceEnv, func_names: &FuncNames) -> Result { + instantiate_wasmtime_instance( + &self.module, + env, + func_names, + true, + // Instantiate an async-enabled wasmtime instance + instantiate_async, + ) + } +} + +fn instantiate_sync( + module: &InstancePre, + store: &mut Store, +) -> Result { + module.instantiate(store).map_err(InitializationError::Instantiation) +} + +fn instantiate_async( + module: &InstancePre, + store: &mut Store, +) -> Result { + let instance_fut = module.instantiate_async(store); + instance_fut + .now_or_never() + .expect("`instantiate_async` did not immediately return `Ready`") + .map_err(InitializationError::Instantiation) +} - // Note: this budget is just for initializers - set_store_fuel(&mut store, FunctionBudget::DEFAULT_BUDGET.into()); - store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); +fn instantiate_wasmtime_instance( + module: &InstancePre, + env: InstanceEnv, + func_names: &FuncNames, + supports_async: bool, + instantiate: impl FnOnce( + &InstancePre, + &mut Store, + ) -> Result, +) -> Result { + let env = WasmInstanceEnv::new(env); + let mut store = Store::new(module.module().engine(), env); + + let instance = instantiate(module, &mut store)?; + + let mem = Mem::extract(&instance, &mut store).unwrap(); + store.data_mut().instantiate(mem); + + store.epoch_deadline_callback(|store| { + let env = store.data(); + let database = env.instance_env().replica_ctx.database_identity; + let funcall = env.log_record_function().unwrap_or_default(); + let dur = env.funcall_start().elapsed(); + // TODO(procedure-timing): This measurement is not super meaningful for procedures, + // which may (will) suspend execution and therefore may not have been continuously running since `env.funcall_start`. + tracing::warn!(funcall, ?database, "Wasm has been running for {dur:?}"); + Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND)) + }); + + // Note: this budget is just for initializers + set_store_fuel(&mut store, FunctionBudget::DEFAULT_BUDGET.into()); + store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); - for preinit in &func_names.preinits { - let func = instance.get_typed_func::<(), ()>(&mut store, preinit).unwrap(); - call_sync_typed_func(&func, &mut store, ()).map_err(|err| InitializationError::RuntimeError { + for preinit in &func_names.preinits { + let func = instance.get_typed_func::<(), ()>(&mut store, preinit).unwrap(); + call_sync_typed_func(&func, &mut store, (), supports_async).map_err(|err| { + InitializationError::RuntimeError { err, func: preinit.clone(), - })?; - } - - if let Ok(init) = instance.get_typed_func::(&mut store, SETUP_DUNDER) { - let setup_error = store.data_mut().setup_standard_bytes_sink(); - let res = call_sync_typed_func(&init, &mut store, setup_error); - let error = store.data_mut().take_standard_bytes_sink(); + } + })?; + } - let res = res - .map_err(ExecutionError::Trap) - .and_then(|code| handle_error_sink_code(code, error)); + if let Ok(init) = instance.get_typed_func::(&mut store, SETUP_DUNDER) { + let setup_error = store.data_mut().setup_standard_bytes_sink(); + let res = call_sync_typed_func(&init, &mut store, setup_error, supports_async); + let error = store.data_mut().take_standard_bytes_sink(); - res.map_err(|e| match e { - ExecutionError::User(err) => InitializationError::Setup(err), - ExecutionError::Recoverable(err) | ExecutionError::Trap(err) => { - let func = SETUP_DUNDER.to_owned(); - InitializationError::RuntimeError { err, func } - } - })? - } + let res = res + .map_err(ExecutionError::Trap) + .and_then(|code| handle_error_sink_code(code, error)); - let call_reducer = instance - .get_typed_func(&mut store, CALL_REDUCER_DUNDER) - .expect("no call_reducer"); - - let call_procedure = get_call_procedure(&mut store, &instance); - let call_view = get_call_view(&mut store, &instance); - let call_view_anon = get_call_view_anon(&mut store, &instance); - store - .data_mut() - .set_call_view_exports(call_view.clone(), call_view_anon.clone()); - - Ok(WasmtimeInstance { - store, - instance, - call_reducer, - call_procedure, - call_view, - call_view_anon, - }) + res.map_err(|e| match e { + ExecutionError::User(err) => InitializationError::Setup(err), + ExecutionError::Recoverable(err) | ExecutionError::Trap(err) => { + let func = SETUP_DUNDER.to_owned(); + InitializationError::RuntimeError { err, func } + } + })? } + + let call_reducer = instance + .get_typed_func(&mut store, CALL_REDUCER_DUNDER) + .expect("no call_reducer"); + + let call_procedure = get_call_procedure(&mut store, &instance); + let call_view = get_call_view(&mut store, &instance); + let call_view_anon = get_call_view_anon(&mut store, &instance); + store + .data_mut() + .set_call_view_exports(call_view.clone(), call_view_anon.clone()); + + Ok(WasmtimeInstance { + store, + instance, + call_reducer, + call_procedure, + call_view, + call_view_anon, + supports_async, + }) } /// Look up the `instance`'s export named by [`CALL_PROCEDURE_DUNDER`]. @@ -408,6 +544,7 @@ pub struct WasmtimeInstance { call_procedure: Option, call_view: Option, call_view_anon: Option, + supports_async: bool, } impl module_host_actor::WasmInstance for WasmtimeInstance { @@ -422,7 +559,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let sink = self.store.data_mut().setup_standard_bytes_sink(); run_describer(log_traceback, || { - call_sync_typed_func(&describer, &mut self.store, sink) + call_sync_typed_func(&describer, &mut self.store, sink, self.supports_async) })?; // Fetch the bsatn returned by the describer call. @@ -479,6 +616,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { args_source.0, errors_sink, ), + self.supports_async, ); let (stats, error) = finish_opcall(store, budget); @@ -512,6 +650,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { Some(*op.sender), args_source.0, errors_sink, + self.supports_async, ); let (stats, result_bytes) = finish_opcall(store, budget); @@ -548,6 +687,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { None, args_source.0, errors_sink, + self.supports_async, ); let (stats, result_bytes) = finish_opcall(store, budget); @@ -570,6 +710,14 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { budget: FunctionBudget, ) -> (module_host_actor::ProcedureExecuteResult, Option) { let store = &mut self.store; + if !self.supports_async { + let res = module_host_actor::ProcedureExecuteResult { + stats: zero_execution_stats(store), + call_result: Err(anyhow::anyhow!("procedures require an async Wasmtime instance")), + }; + return (res, None); + } + prepare_store_for_call(store, budget); // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 015bc1dc1ef..89c18ee5242 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,5 +1,5 @@ use std::panic::AssertUnwindSafe; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{mpsc as std_mpsc, Arc, Mutex, Weak}; use core_affinity::CoreId; use futures::future::LocalBoxFuture; @@ -219,14 +219,27 @@ pub struct AllocatedJobCore { } impl AllocatedJobCore { + pub fn into_shared(self) -> (Arc, CorePinner) { + (Arc::new(self.guard), self.pinner) + } + /// Spawn a [`SingleCoreExecutor`] allocated to this core. - pub fn spawn_async_executor(self) -> SingleCoreExecutor { - SingleCoreExecutor::spawn(self, None) + pub fn spawn_async_executor( + guard: Arc, + pinner: CorePinner, + name: impl Into, + ) -> SingleCoreExecutor { + SingleCoreExecutor::spawn_and_pin(guard, pinner, Some(name.into())) } - /// Spawn a named [`SingleCoreExecutor`] allocated to this core. - pub fn spawn_named_async_executor(self, name: impl Into) -> SingleCoreExecutor { - SingleCoreExecutor::spawn(self, Some(name.into())) + /// Spawn a [`SingleThreadedExecutor`] allocated to this core. + pub fn spawn_executor( + guard: Arc, + pinner: CorePinner, + state: S, + name: impl Into, + ) -> SingleThreadedExecutor { + SingleThreadedExecutor::spawn_and_pin(guard, pinner, state, Some(name.into())) } } @@ -299,8 +312,11 @@ struct SingleCoreExecutorInner { impl SingleCoreExecutor { /// Spawn a `SingleCoreExecutor` on the given core. fn spawn(core: AllocatedJobCore, name: Option) -> Self { - let AllocatedJobCore { guard, mut pinner } = core; + let (guard, pinner) = core.into_shared(); + Self::spawn_and_pin(guard, pinner, name) + } + fn spawn_and_pin(guard: Arc, mut pinner: CorePinner, name: Option) -> Self { let (job_tx, mut job_rx) = mpsc::unbounded_channel(); let inner = Arc::new(SingleCoreExecutorInner { job_tx }); @@ -405,6 +421,111 @@ impl SingleCoreExecutor { } } +/// A handle to a plain OS-thread executor for synchronous database work. +/// +/// Unlike [`SingleCoreExecutor`], it is intended for synchronous runtimes. +/// This executor never enters Tokio and never polls futures on its worker +/// thread. +pub struct SingleThreadedExecutor { + inner: Arc>, +} + +struct SingleThreadedExecutorInner { + job_tx: std_mpsc::Sender>, +} + +type SyncJob = Box; + +impl Clone for SingleThreadedExecutor { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl SingleThreadedExecutor { + fn spawn_and_pin( + guard: Arc, + mut pinner: CorePinner, + mut state: S, + name: Option, + ) -> Self { + let (job_tx, job_rx) = std_mpsc::channel::>(); + + let inner = Arc::new(SingleThreadedExecutorInner { job_tx }); + + let mut thread = std::thread::Builder::new(); + if let Some(name) = name { + thread = thread.name(name); + } + let worker = move || { + let _guard = guard; + pinner.pin_now(); + + while let Ok(job) = job_rx.recv() { + pinner.pin_if_changed(); + job(&mut state); + } + }; + thread + .spawn(worker) + .expect("failed to spawn thread for `SingleThreadedExecutor`"); + + Self { inner } + } + + /// Run `f` on this database executor and return its result. + pub async fn run_job(&self, f: F) -> R + where + F: FnOnce(&mut S) -> R + Send + 'static, + R: Send + 'static, + { + let span = tracing::Span::current(); + let (tx, rx) = oneshot::channel(); + + self.inner + .job_tx + .send(Box::new(move |state| { + let result = std::panic::catch_unwind(AssertUnwindSafe(|| { + let _entered = span.enter(); + f(state) + })); + if let Err(Err(_panic)) = tx.send(result) { + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") + } + })) + .unwrap_or_else(|_| panic!("job thread exited")); + + match rx.await.unwrap() { + Ok(r) => r, + Err(e) => std::panic::resume_unwind(e), + } + } + + /// Enqueue `f` without waiting for its result. + pub fn enqueue_job(&self, f: F) + where + F: FnOnce(&mut S) + Send + 'static, + { + let span = tracing::Span::current(); + + self.inner + .job_tx + .send(Box::new(move |state| { + if std::panic::catch_unwind(AssertUnwindSafe(|| { + let _entered = span.enter(); + f(state); + })) + .is_err() + { + tracing::warn!("uncaught panic on `SingleThreadedExecutor`") + } + })) + .unwrap_or_else(|_| panic!("job thread exited")); + } +} + /// On drop, tells the [`JobCores`] that this database is no longer occupying its core, /// allowing databases from more-contended runtimes/cores to migrate there. #[derive(Default)]