Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graceful shutdown #2456

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5562d1b
feat: add util crate with the task module
CHr15F0x Dec 16, 2024
55d6ec9
refactor: use rayon where tokio::task::spawn_blocking was sub-optimal
CHr15F0x Dec 17, 2024
1643911
feat(util/task): add cancellation aware std::thread::spawn wrapper
CHr15F0x Dec 17, 2024
4108bef
refactor: merge make_stream into the util crate
CHr15F0x Dec 17, 2024
915e902
refactor: move AnyhowExt into the util crate
CHr15F0x Dec 17, 2024
b2b47ab
feat(make_stream/from_blocking): bail out upon cancellation
CHr15F0x Dec 17, 2024
5815256
doc(util/task): minor refinement of doc comments
CHr15F0x Dec 17, 2024
323eee3
fix(sync/checkpoint): rollback to anchor is not committed to the DB
CHr15F0x Dec 17, 2024
8b3eed3
feat: track tasks in pathfinder-ethereum and pathfinder crates
CHr15F0x Dec 17, 2024
fb4c41b
chore: cargo sort
CHr15F0x Dec 17, 2024
c277aeb
chore: remove dead code
CHr15F0x Dec 17, 2024
6ec855c
feat(rpc): enable graceful shutdown for the rpc server
CHr15F0x Dec 18, 2024
8cd70f7
feat: track tasks in pathfinder-rpc
CHr15F0x Dec 18, 2024
191e667
feat(pathfinder): force exit after a grace period
CHr15F0x Dec 18, 2024
9e22b2a
feat(pathfinder/config): add grace period to config
CHr15F0x Dec 18, 2024
18e6fce
feat(monitoring): add graceful shutdown
CHr15F0x Dec 18, 2024
9ef6e5f
refactor(main): reorder initialization towards disallowing task detac…
CHr15F0x Dec 19, 2024
1983efe
fix(p2p): storage connection pool too small
CHr15F0x Dec 23, 2024
df099a4
fix: critical task errors not propagated to process exit
CHr15F0x Dec 23, 2024
4f55593
feat: make sure the last RW db connection pool is dropped as the last
CHr15F0x Dec 23, 2024
b5665cf
feat: force orderly cancellation even if rpc server or p2p fail to start
CHr15F0x Dec 23, 2024
f380908
fixup: allow interrupting migrations and trie pruning
CHr15F0x Dec 23, 2024
b5358ab
chore: update changelog
CHr15F0x Dec 23, 2024
0f49802
doc: fix typo
CHr15F0x Dec 24, 2024
95c98e1
doc: fix another typo
CHr15F0x Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- Graceful shutdown upon SIGINT and SIGTERM with a default grace period of 10 seconds, configurable via `--shutdown.grace-period`.

### Fixed

- `pathfinder_getProof`, `pathfinder_getClassProof` return `ProofMissing` (10001) when Pathfinder is in `archive` mode and queried block's tries are empty.
Expand Down
31 changes: 19 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ members = [
"crates/gateway-test-fixtures",
"crates/gateway-test-utils",
"crates/gateway-types",
"crates/make-stream",
"crates/merkle-tree",
"crates/p2p",
"crates/p2p_proto",
Expand All @@ -23,6 +22,7 @@ members = [
"crates/storage",
"crates/tagged",
"crates/tagged-debug-derive",
"crates/util",
]
exclude = [
"crates/load-test",
Expand Down Expand Up @@ -137,6 +137,7 @@ tokio = "1.37.0"
tokio-retry = "0.3.0"
tokio-stream = "0.1.14"
tokio-tungstenite = "0.21"
tokio-util = { version = "0.7.13", features = ["rt"] }
tower = { version = "0.4.13", default-features = false }
tower-http = { version = "0.5.2", default-features = false }
tracing = "0.1.37"
Expand Down
1 change: 0 additions & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use serde::{Deserialize, Serialize};
pub mod casm_class;
pub mod class_definition;
pub mod consts;
pub mod error;
pub mod event;
pub mod hash;
mod header;
Expand Down
2 changes: 1 addition & 1 deletion crates/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ reqwest = { workspace = true, features = ["json"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true }

util = { path = "../util" }
11 changes: 10 additions & 1 deletion crates/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ impl EthereumApi for EthereumClient {
let provider_clone = provider.clone();
let (finalized_block_tx, mut finalized_block_rx) =
tokio::sync::mpsc::channel::<L1BlockNumber>(1);
tokio::spawn(async move {

util::task::spawn(async move {
let mut interval = tokio::time::interval(poll_interval);
loop {
interval.tick().await;
Expand All @@ -165,6 +166,14 @@ impl EthereumApi for EthereumClient {
let _ = finalized_block_tx.send(block_number).await.unwrap();
}
}
// This is to mitigate the warning: "this function depends on never type
// fallback being `()`" as we are unable to implement
// [`util::task::FutureOutputExt`] for the never type `!`. Consequently, when
// this warning becomes a hard error we should keep this workaround or by then
// the `!` type will not be experimental anymore.
// Warning related issue: https://github.com/rust-lang/rust/issues/123748
#[allow(unreachable_code)]
()
});

// Process incoming events
Expand Down
2 changes: 1 addition & 1 deletion crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ libp2p = { workspace = true, features = [
"tokio",
"yamux",
] }
make-stream = { path = "../make-stream" }
p2p_proto = { path = "../p2p_proto" }
p2p_stream = { path = "../p2p_stream" }
pathfinder-common = { path = "../common" }
Expand All @@ -55,6 +54,7 @@ tokio-stream = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
unsigned-varint = { workspace = true, features = ["futures"] }
util = { path = "../util" }
void = { workspace = true }
zeroize = { workspace = true }

Expand Down
10 changes: 5 additions & 5 deletions crates/p2p/src/client/peer_agnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ mod header_stream {

tracing::trace!(?start, ?stop, ?dir, "Streaming headers");

make_stream::from_future(move |tx| async move {
util::make_stream::from_future(move |tx| async move {
// Loop which refreshes peer set once we exhaust it.
loop {
'next_peer: for peer in get_peers().await {
Expand Down Expand Up @@ -778,7 +778,7 @@ mod transaction_stream {
{
tracing::trace!(?start, ?stop, "Streaming Transactions");

make_stream::from_future(move |tx| async move {
util::make_stream::from_future(move |tx| async move {
let mut expected_transaction_counts_stream = Box::pin(counts_stream);

let cnt = match try_next(&mut expected_transaction_counts_stream).await {
Expand Down Expand Up @@ -961,7 +961,7 @@ mod state_diff_stream {
{
tracing::trace!(?start, ?stop, "Streaming state diffs");

make_stream::from_future(move |tx| async move {
util::make_stream::from_future(move |tx| async move {
let mut length_stream = Box::pin(length_stream);

let cnt = match try_next(&mut length_stream).await {
Expand Down Expand Up @@ -1185,7 +1185,7 @@ mod class_definition_stream {
{
tracing::trace!(?start, ?stop, "Streaming classes");

make_stream::from_future(move |tx| async move {
util::make_stream::from_future(move |tx| async move {
let mut declared_class_counts_stream = Box::pin(counts_stream);

let cnt = match try_next(&mut declared_class_counts_stream).await {
Expand Down Expand Up @@ -1384,7 +1384,7 @@ mod event_stream {
{
tracing::trace!(?start, ?stop, "Streaming events");

make_stream::from_future(move |tx| async move {
util::make_stream::from_future(move |tx| async move {
let mut counts_stream = Box::pin(counts_stream);

let Some(Ok(cnt)) = counts_stream.next().await else {
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ futures = { workspace = true, features = ["alloc"] }
http = { workspace = true }
ipnet = { workspace = true }
jemallocator = { workspace = true }
make-stream = { path = "../make-stream" }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
p2p = { path = "../p2p" }
Expand Down Expand Up @@ -71,6 +70,7 @@ tracing-subscriber = { workspace = true, features = [
"ansi",
] }
url = { workspace = true }
util = { path = "../util" }
zeroize = { workspace = true }
zstd = { workspace = true }

Expand Down
21 changes: 16 additions & 5 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ This should only be enabled for debugging purposes as it adds substantial proces
action=ArgAction::Set
)]
fetch_casm_from_fgw: bool,

#[arg(
long = "shutdown.grace-period",
value_name = "Seconds",
long_help = "Timeout duration for graceful shutdown after receiving a SIGINT or SIGTERM",
env = "PATHFINDER_SHUTDOWN_GRACE_PERIOD",
default_value = "10"
)]
shutdown_grace_period: std::num::NonZeroU64,
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -704,8 +713,8 @@ pub struct Config {
pub execution_concurrency: Option<std::num::NonZeroU32>,
pub sqlite_wal: JournalMode,
pub max_rpc_connections: std::num::NonZeroUsize,
pub poll_interval: std::time::Duration,
pub l1_poll_interval: std::time::Duration,
pub poll_interval: Duration,
pub l1_poll_interval: Duration,
pub color: Color,
pub log_output_json: bool,
pub disable_version_update_check: bool,
Expand All @@ -724,6 +733,7 @@ pub struct Config {
pub custom_versioned_constants: Option<VersionedConstants>,
pub feeder_gateway_fetch_concurrency: NonZeroUsize,
pub fetch_casm_from_fgw: bool,
pub shutdown_grace_period: Duration,
}

pub struct Ethereum {
Expand Down Expand Up @@ -769,7 +779,7 @@ pub struct P2PConfig;

pub struct DebugConfig {
pub pretty_log: bool,
pub restart_delay: std::time::Duration,
pub restart_delay: Duration,
}

impl NetworkConfig {
Expand Down Expand Up @@ -955,7 +965,7 @@ impl DebugConfig {
fn parse(_: ()) -> Self {
Self {
pretty_log: false,
restart_delay: std::time::Duration::from_secs(60),
restart_delay: Duration::from_secs(60),
}
}
}
Expand All @@ -965,7 +975,7 @@ impl DebugConfig {
fn parse(args: DebugCli) -> Self {
Self {
pretty_log: args.pretty_log,
restart_delay: std::time::Duration::from_secs(args.restart_delay),
restart_delay: Duration::from_secs(args.restart_delay),
}
}
}
Expand Down Expand Up @@ -1018,6 +1028,7 @@ impl Config {
.custom_versioned_constants_path
.map(parse_versioned_constants_or_exit),
fetch_casm_from_fgw: cli.fetch_casm_from_fgw,
shutdown_grace_period: Duration::from_secs(cli.shutdown_grace_period.get()),
}
}
}
Expand Down
Loading