Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 132 additions & 105 deletions crates/core/src/host/module_host.rs

Large diffs are not rendered by default.

344 changes: 215 additions & 129 deletions crates/core/src/host/scheduler.rs

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,10 +1362,7 @@ fn handle_main_worker_request(
}
JsMainWorkerRequest::ScheduledReducer { reply_tx, params } => {
handle_worker_request("scheduled_reducer", reply_tx, || {
let (res, trapped) = instance_common
.call_scheduled_function(params, inst)
.now_or_never()
.expect("our call_scheduled_function implementation is not actually async");
let (res, trapped) = instance_common.call_scheduled_reducer(params, inst);
(res, trapped)
})
}
Expand Down Expand Up @@ -1463,9 +1460,9 @@ fn handle_procedure_worker_request(
JsProcedureWorkerRequest::ScheduledProcedure { reply_tx, params } => {
handle_worker_request("scheduled_procedure", reply_tx, || {
let (res, trapped) = instance_common
.call_scheduled_function(params, inst)
.call_scheduled_procedure(params, inst)
.now_or_never()
.expect("our call_scheduled_function implementation is not actually async");
.expect("our call_scheduled_procedure implementation is not actually async");
(res, trapped)
})
}
Expand Down
34 changes: 30 additions & 4 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ impl<T: WasmModule> WasmModuleHostActor<T> {

Ok((module, initial_instance))
}

pub fn with_runtime_module<U: WasmModule>(&self, module: U) -> Result<WasmModuleHostActor<U>, InitializationError> {
let module = module.instantiate_pre()?;
Ok(WasmModuleHostActor {
module,
common: self.common.clone(),
func_names: self.func_names.clone(),
})
}
}

impl<T: WasmModule> WasmModuleHostActor<T> {
Expand Down Expand Up @@ -528,11 +537,20 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
res
}

pub(in crate::host) async fn call_scheduled_function(
pub(in crate::host) async fn call_scheduled_procedure(
&mut self,
params: ScheduledFunctionParams,
) -> CallScheduledFunctionResult {
let (res, trapped) = self.common.call_scheduled_function(params, &mut self.instance).await;
let (res, trapped) = self.common.call_scheduled_procedure(params, &mut self.instance).await;
self.trapped = trapped;
res
}

pub(in crate::host) fn call_scheduled_reducer(
&mut self,
params: ScheduledFunctionParams,
) -> CallScheduledFunctionResult {
let (res, trapped) = self.common.call_scheduled_reducer(params, &mut self.instance);
self.trapped = trapped;
res
}
Expand Down Expand Up @@ -1363,12 +1381,20 @@ impl InstanceCommon {
self.info.relational_db().clear_all_clients().map_err(Into::into)
}

pub(crate) async fn call_scheduled_function<I: WasmInstance>(
pub(crate) async fn call_scheduled_procedure<I: WasmInstance>(
&mut self,
params: ScheduledFunctionParams,
inst: &mut I,
) -> (CallScheduledFunctionResult, bool) {
crate::host::scheduler::call_scheduled_procedure(&self.info.clone(), params, self, inst).await
}

pub(crate) fn call_scheduled_reducer<I: WasmInstance>(
&mut self,
params: ScheduledFunctionParams,
inst: &mut I,
) -> (CallScheduledFunctionResult, bool) {
crate::host::scheduler::call_scheduled_function(&self.info.clone(), params, self, inst).await
crate::host::scheduler::call_scheduled_reducer(&self.info.clone(), params, self, inst)
}
}

Expand Down
160 changes: 102 additions & 58 deletions crates/core/src/host/wasmtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ use spacetimedb_paths::server::ServerDataDir;
use std::borrow::Cow;
use std::time::Duration;
use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut};
pub use wasmtime_module::{WasmtimeInstance, WasmtimeModule};
pub use wasmtime_module::{WasmtimeAsyncModule, WasmtimeInstance, WasmtimeModule};

#[cfg(unix)]
mod pooling_stack_creator;
mod wasm_instance_env;
mod wasmtime_module;

pub struct WasmtimeRuntime {
engine: Engine,
linker: Box<Linker<WasmInstanceEnv>>,
sync_engine: Engine,
sync_linker: Box<Linker<WasmInstanceEnv>>,
async_engine: Engine,
async_linker: Box<Linker<WasmInstanceEnv>>,
config: WasmConfig,
}

Expand All @@ -46,76 +48,106 @@ pub(crate) fn epoch_ticker(mut on_tick: impl 'static + Send + FnMut() -> Option<

impl WasmtimeRuntime {
pub fn new(data_dir: Option<&ServerDataDir>, runtime_config: WasmConfig) -> Self {
let mut config = wasmtime::Config::new();
config
.cranelift_opt_level(wasmtime::OptLevel::Speed)
.consume_fuel(true)
.epoch_interruption(true)
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable)
// We need async support to enable suspending execution of procedures
// when waiting for e.g. HTTP responses or the transaction lock.
// We don't enable either fuel-based or epoch-based yielding
// (see https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.epoch_deadline_async_yield_and_update
// and https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.fuel_async_yield_interval)
// so reducers will always execute to completion during the first `Future::poll` call,
// and procedures will only yield when performing an asynchronous operation.
// These futures are executed on a separate single-threaded executor not related to the "global" Tokio runtime,
// which is responsible only for executing WASM. See `crate::util::jobs` for this infrastructure.
.async_support(true);
let sync_config = wasmtime_config(data_dir, false);
let async_config = wasmtime_config(data_dir, true);

#[cfg(unix)]
config
.async_stack_size(self::pooling_stack_creator::ASYNC_STACK_SIZE)
.with_host_stack(self::pooling_stack_creator::PoolingStackCreator::new());
let sync_engine = Engine::new(&sync_config).unwrap();
let async_engine = Engine::new(&async_config).unwrap();

// Offer a compile-time flag for enabling perfmap generation,
// so `perf` can display JITted symbol names.
// Ideally we would be able to configure this at runtime via a flag to `spacetime start`,
// but this is good enough for now.
#[cfg(feature = "perfmap")]
config.profiler(wasmtime::ProfilingStrategy::PerfMap);

if let Some(data_dir) = data_dir {
let mut cache_config = wasmtime::CacheConfig::new();
cache_config.with_directory(data_dir.wasmtime_cache().0);
match wasmtime::Cache::new(cache_config) {
Ok(cache) => {
config.cache(Some(cache));
}
Err(e) => {
// caching is just an optimization, so if it fails, just log and continue
tracing::warn!("failed to set up wasmtime cache: {e:#}")
}
let weak_sync_engine = sync_engine.weak();
let weak_async_engine = async_engine.weak();
epoch_ticker(move || {
let mut ticked = false;
if let Some(engine) = weak_sync_engine.upgrade() {
engine.increment_epoch();
ticked = true;
}
if let Some(engine) = weak_async_engine.upgrade() {
engine.increment_epoch();
ticked = true;
}
ticked.then_some(())
});

let mut sync_linker = Box::new(Linker::new(&sync_engine));
WasmtimeModule::link_imports(&mut sync_linker).unwrap();

let mut async_linker = Box::new(Linker::new(&async_engine));
WasmtimeAsyncModule::link_imports(&mut async_linker).unwrap();

let config = runtime_config;
WasmtimeRuntime {
sync_engine,
sync_linker,
async_engine,
async_linker,
config,
}
}
}

let engine = Engine::new(&config).unwrap();
fn wasmtime_config(data_dir: Option<&ServerDataDir>, async_support: bool) -> wasmtime::Config {
let mut config = wasmtime::Config::new();
config
.cranelift_opt_level(wasmtime::OptLevel::Speed)
.consume_fuel(true)
.epoch_interruption(true)
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);

let weak_engine = engine.weak();
epoch_ticker(move || {
let engine = weak_engine.upgrade()?;
engine.increment_epoch();
Some(())
});
if async_support {
// Procedure instances need async support to suspend execution when waiting for
// e.g. HTTP responses or the transaction lock. Main-lane instances use a
// separate sync engine so reducers/views do not pay Wasmtime's fiber overhead.
config.async_support(true);

let mut linker = Box::new(Linker::new(&engine));
WasmtimeModule::link_imports(&mut linker).unwrap();
#[cfg(unix)]
config
.async_stack_size(self::pooling_stack_creator::ASYNC_STACK_SIZE)
.with_host_stack(self::pooling_stack_creator::PoolingStackCreator::new());
}

let config = runtime_config;
WasmtimeRuntime { engine, linker, config }
// Offer a compile-time flag for enabling perfmap generation,
// so `perf` can display JITted symbol names.
// Ideally we would be able to configure this at runtime via a flag to `spacetime start`,
// but this is good enough for now.
#[cfg(feature = "perfmap")]
config.profiler(wasmtime::ProfilingStrategy::PerfMap);

if let Some(data_dir) = data_dir {
let mut cache_config = wasmtime::CacheConfig::new();
cache_config.with_directory(data_dir.wasmtime_cache().0);
match wasmtime::Cache::new(cache_config) {
Ok(cache) => {
config.cache(Some(cache));
}
Err(e) => {
// caching is just an optimization, so if it fails, just log and continue
tracing::warn!("failed to set up wasmtime cache: {e:#}")
}
}
}

config
}

pub type Module = WasmModuleHostActor<WasmtimeModule>;
pub type ProcedureModule = WasmModuleHostActor<WasmtimeAsyncModule>;
pub type ModuleInstance = WasmModuleInstance<WasmtimeInstance>;

const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 8;

fn wasm_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
let hex = database_identity.to_hex();
// We use the tail of the identity to avoid the common structured prefix.
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
format!("wasm-main-{suffix}")
}

fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
let hex = database_identity.to_hex();
// We use the tail of the identity to avoid the common structured prefix.
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
format!("wasm-{suffix}")
format!("wasm-proc-{suffix}")
}

impl WasmtimeRuntime {
Expand All @@ -126,7 +158,7 @@ impl WasmtimeRuntime {
core: AllocatedJobCore,
) -> anyhow::Result<super::module_host::ModuleWithInstance> {
let module =
wasmtime::Module::new(&self.engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?;
wasmtime::Module::new(&self.sync_engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?;

let func_imports = module
.imports()
Expand All @@ -136,17 +168,29 @@ impl WasmtimeRuntime {
abi::verify_supported(WasmtimeModule::IMPLEMENTED_ABI, abi)?;

let module = self
.linker
.sync_linker
.instantiate_pre(&module)
.map_err(InitializationError::Instantiation)?;
let procedure_module =
wasmtime::Module::new(&self.async_engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?;
let procedure_module = self
.async_linker
.instantiate_pre(&procedure_module)
.map_err(InitializationError::Instantiation)?;

let module = WasmtimeModule::new(module);
let executor_thread_name = wasm_executor_thread_name(&mcc.replica_ctx.database_identity);
let procedure_module = WasmtimeAsyncModule::new(procedure_module);
let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity);
let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity);

let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?;
let procedure_module = module.with_runtime_module(procedure_module)?;
Ok(super::module_host::ModuleWithInstance::Wasm {
module,
executor: core.spawn_named_async_executor(executor_thread_name),
procedure_module,
main_thread_name,
procedure_thread_name,
core,
init_inst: Box::new(init_inst),
procedure_instance_pool_size: self.config.procedure_instance_pool_size,
})
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,7 @@ impl WasmInstanceEnv {
view_call.sender,
args_source.0,
result_sink,
true,
)?;

Ok(code)
Expand Down
Loading
Loading