Skip to content

Commit

Permalink
Make sure we can reach the user's requested FVM concurrency (filecoin…
Browse files Browse the repository at this point in the history
…-project#449)

We previously used the FVM's `ThreadedExecutor` to execute messages on
separate threads because the FVM requires 64MiB of stack space.

1. The FVM v3 supported for 8 concurrent threads.
2. The FVM v4 supports up to the number of CPU threads available.

Unfortunately, neither version was influenced by the
`LOTUS_FVM_CONCURRENCY` environment variable.

This patch fixes this by:

1. Moving the thread-pool to the FFI itself (sharing it between FVM
versions).
2. Setting the thread-pool size equal to `LOTUS_FVM_CONCURRENCY`.

It also defaults `LOTUS_FVM_CONCURRENCY` to the number of available
CPU threads instead of the previous 4.

NOTE: I've also tried increasing the stack size instead of using
threads, but Go _does not_ like it when other foreign mess with the
stack size of _its_ threads (but it has no problem if we create our own
threads).

fixes filecoin-project/lotus#11817
  • Loading branch information
Stebalien authored Apr 10, 2024
1 parent b715c94 commit 5868337
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 66 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ serde = "1.0.117"
serde_tuple = "0.5"
safer-ffi = { version = "0.0.7", features = ["proc_macros"] }
filecoin-proofs-api = { version = "16.1", default-features = false }
yastl = "0.1.2"

[dev-dependencies]
memmap2 = "0.5"
tempfile = "3.0.8"

[features]
default = ["cuda", "multicore-sdr" ]
default = ["cuda", "multicore-sdr"]
blst-portable = ["bls-signatures/blst-portable", "blstrs/portable"]
cuda = ["filecoin-proofs-api/cuda", "rust-gpu-tools/cuda", "fvm2/cuda", "fvm3/cuda", "fvm4/cuda"]
cuda-supraseal = ["filecoin-proofs-api/cuda-supraseal", "rust-gpu-tools/cuda", "fvm3/cuda-supraseal", "fvm4/cuda-supraseal"]
Expand Down
73 changes: 10 additions & 63 deletions rust/src/fvm/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::ops::RangeInclusive;
use std::sync::{Arc, Mutex};

use anyhow::anyhow;
Expand All @@ -18,7 +17,7 @@ use super::externs::CgoExterns;
use super::types::*;

// Generic executor; uses the current (v3) engine types
pub trait CgoExecutor {
pub trait CgoExecutor: Send {
fn execute_message(
&mut self,
msg: Message,
Expand Down Expand Up @@ -65,9 +64,6 @@ pub struct MultiEngineContainer {
engines: Mutex<HashMap<EngineVersion, Arc<dyn AbstractMultiEngine + 'static>>>,
}

const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=128;

impl TryFrom<u32> for EngineVersion {
type Error = anyhow::Error;
fn try_from(value: u32) -> Result<Self, Self::Error> {
Expand All @@ -81,41 +77,6 @@ impl TryFrom<u32> for EngineVersion {
}

impl MultiEngineContainer {
/// Constructs a new multi-engine container with the default concurrency (4).
pub fn new() -> MultiEngineContainer {
Self::with_concurrency(4)
}

/// Constructs a new multi-engine container with the concurrency specified in the
/// `LOTUS_FVM_CONCURRENCY` environment variable.
pub fn new_env() -> MultiEngineContainer {
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
Some(v) => v,
None => return Self::new(),
};
let valstr = match valosstr.to_str() {
Some(s) => s,
None => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
return Self::new();
}
};
let concurrency: u32 = match valstr.parse() {
Ok(v) => v,
Err(e) => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
return Self::new();
}
};
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
log::error!(
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
);
return Self::new();
}
Self::with_concurrency(concurrency)
}

pub fn with_concurrency(concurrency: u32) -> MultiEngineContainer {
MultiEngineContainer {
engines: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -146,12 +107,6 @@ impl MultiEngineContainer {
}
}

impl Default for MultiEngineContainer {
fn default() -> MultiEngineContainer {
MultiEngineContainer::new()
}
}

// fvm v4 implementation
mod v4 {
use anyhow::anyhow;
Expand All @@ -160,10 +115,7 @@ mod v4 {

use fvm4::call_manager::DefaultCallManager as DefaultCallManager4;
use fvm4::engine::{EnginePool as EnginePool4, MultiEngine as MultiEngine4};
use fvm4::executor::{
ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4,
ThreadedExecutor as ThreadedExecutor4,
};
use fvm4::executor::{ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4};
use fvm4::kernel::filecoin::DefaultFilecoinKernel as DefaultFilecoinKernel4;
use fvm4::machine::{DefaultMachine as DefaultMachine4, NetworkConfig};
use fvm4_shared::{chainid::ChainID, clock::ChainEpoch, message::Message};
Expand All @@ -175,14 +127,13 @@ mod v4 {
use super::Config;

type CgoMachine4 = DefaultMachine4<CgoBlockstore, CgoExterns>;
type BaseExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;
type CgoExecutor4 = ThreadedExecutor4<BaseExecutor4>;
type CgoExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;

fn new_executor(
engine_pool: EnginePool4,
machine: CgoMachine4,
) -> anyhow::Result<CgoExecutor4> {
Ok(ThreadedExecutor4(BaseExecutor4::new(engine_pool, machine)?))
CgoExecutor4::new(engine_pool, machine)
}

impl CgoExecutor for CgoExecutor4 {
Expand Down Expand Up @@ -254,8 +205,7 @@ mod v3 {
};
use fvm3::engine::{EnginePool as EnginePool3, MultiEngine as MultiEngine3};
use fvm3::executor::{
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3,
DefaultExecutor as DefaultExecutor3, ThreadedExecutor as ThreadedExecutor3,
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3, DefaultExecutor as DefaultExecutor3,
};
use fvm3::machine::{DefaultMachine as DefaultMachine3, NetworkConfig as NetworkConfig3};
use fvm3::trace::ExecutionEvent as ExecutionEvent3;
Expand Down Expand Up @@ -284,14 +234,13 @@ mod v3 {
use super::Config;

type CgoMachine3 = DefaultMachine3<CgoBlockstore, CgoExterns>;
type BaseExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;
type CgoExecutor3 = ThreadedExecutor3<BaseExecutor3>;
type CgoExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;

fn new_executor(
engine_pool: EnginePool3,
machine: CgoMachine3,
) -> anyhow::Result<CgoExecutor3> {
Ok(ThreadedExecutor3(BaseExecutor3::new(engine_pool, machine)?))
CgoExecutor3::new(engine_pool, machine)
}

impl CgoExecutor for CgoExecutor3 {
Expand Down Expand Up @@ -533,8 +482,7 @@ mod v2 {
backtrace::Cause as Cause2, DefaultCallManager as DefaultCallManager2,
};
use fvm2::executor::{
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2,
DefaultExecutor as DefaultExecutor2, ThreadedExecutor as ThreadedExecutor2,
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2, DefaultExecutor as DefaultExecutor2,
};
use fvm2::machine::{
DefaultMachine as DefaultMachine2, MultiEngine as MultiEngine2,
Expand Down Expand Up @@ -565,11 +513,10 @@ mod v2 {
use super::Config;

type CgoMachine2 = DefaultMachine2<CgoBlockstore, CgoExterns>;
type BaseExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;
type CgoExecutor2 = ThreadedExecutor2<BaseExecutor2>;
type CgoExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;

fn new_executor(machine: CgoMachine2) -> CgoExecutor2 {
ThreadedExecutor2(BaseExecutor2::new(machine))
CgoExecutor2::new(machine)
}

fn bytes_to_block(bytes: RawBytes) -> Option<IpldBlock> {
Expand Down
69 changes: 67 additions & 2 deletions rust/src/fvm/machine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::convert::{TryFrom, TryInto};
use std::ops::RangeInclusive;

use anyhow::{anyhow, Context};
use cid::Cid;
Expand Down Expand Up @@ -27,8 +28,54 @@ use super::types::*;
use crate::destructor;
use crate::util::types::{catch_panic_response, catch_panic_response_no_default, Result};

const STACK_SIZE: usize = 64 << 20; // 64MiB

lazy_static! {
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::new_env();
static ref CONCURRENCY: u32 = get_concurrency();
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::with_concurrency(*CONCURRENCY);
static ref THREAD_POOL: yastl::Pool = yastl::Pool::with_config(
*CONCURRENCY as usize,
yastl::ThreadConfig::new()
.prefix("fvm")
.stack_size(STACK_SIZE)
);
}

const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=256;

fn available_parallelism() -> u32 {
std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(8) as u32
}

fn get_concurrency() -> u32 {
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
Some(v) => v,
None => return available_parallelism(),
};
let valstr = match valosstr.to_str() {
Some(s) => s,
None => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
return available_parallelism();
}
};
let concurrency: u32 = match valstr.parse() {
Ok(v) => v,
Err(e) => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
return available_parallelism();
}
};
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
log::error!(
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
);
return available_parallelism();
}
concurrency
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -181,14 +228,32 @@ fn create_fvm_debug_machine(
)
}

fn with_new_stack<F, T>(name: &str, pool: &yastl::Pool, callback: F) -> repr_c::Box<Result<T>>
where
T: Sized + Default + Send,
F: FnOnce() -> anyhow::Result<T> + std::panic::UnwindSafe + Send,
{
let mut res = None;
pool.scoped(|scope| scope.execute(|| res = Some(catch_panic_response(name, callback))));

res.unwrap_or_else(|| {
repr_c::Box::new(Result::err(
format!("failed to schedule {name}")
.into_bytes()
.into_boxed_slice(),
))
})
}

#[ffi_export]
fn fvm_machine_execute_message(
executor: &'_ InnerFvmMachine,
message: c_slice::Ref<u8>,
chain_len: u64,
apply_kind: u64, /* 0: Explicit, _: Implicit */
) -> repr_c::Box<Result<FvmMachineExecuteResponse>> {
catch_panic_response("fvm_machine_execute_message", || {
// Execute in the thread-pool because we need a 64MiB stack.
with_new_stack("fvm_machine_execute_message", &THREAD_POOL, || {
let apply_kind = if apply_kind == 0 {
ApplyKind::Explicit
} else {
Expand Down

0 comments on commit 5868337

Please sign in to comment.