diff --git a/Cargo.lock b/Cargo.lock index b6e32b2a..201b0c5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -884,7 +884,7 @@ dependencies = [ "bitflags 2.8.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "proc-macro2", @@ -2276,15 +2276,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.13.0" diff --git a/bin/host/Cargo.toml b/bin/host/Cargo.toml index 943d7704..b1d4322e 100644 --- a/bin/host/Cargo.toml +++ b/bin/host/Cargo.toml @@ -50,9 +50,9 @@ tracing.workspace = true reqwest.workspace = true serde_json.workspace = true async-trait.workspace = true +rocksdb = { workspace = true, features = ["snappy"] } tokio = { workspace = true, features = ["full"] } serde = { workspace = true, features = ["derive"] } -rocksdb = { workspace = true, features = ["snappy"] } clap = { workspace = true, features = ["derive", "env"] } tracing-subscriber = { workspace = true, features = ["fmt"] } diff --git a/bin/host/src/cli/mod.rs b/bin/host/src/cli/mod.rs index 9dc13bf9..9fd6273f 100644 --- a/bin/host/src/cli/mod.rs +++ b/bin/host/src/cli/mod.rs @@ -44,7 +44,7 @@ pub enum HostMode { } /// Styles for the CLI application. -const fn cli_styles() -> clap::builder::Styles { +pub(crate) const fn cli_styles() -> clap::builder::Styles { clap::builder::Styles::styled() .usage(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Yellow)))) .header(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Yellow)))) diff --git a/bin/host/src/eth/mod.rs b/bin/host/src/eth/mod.rs index ddbb40fc..749897f7 100644 --- a/bin/host/src/eth/mod.rs +++ b/bin/host/src/eth/mod.rs @@ -1,5 +1,10 @@ //! Ethereum utilities for the host binary. +use alloy_provider::ReqwestProvider; +use alloy_rpc_client::RpcClient; +use alloy_transport_http::Http; +use reqwest::Client; + mod blobs; pub use blobs::{ APIConfigResponse, APIGenesisResponse, OnlineBlobProvider, ReducedConfigData, @@ -8,3 +13,10 @@ pub use blobs::{ mod precompiles; pub(crate) use precompiles::execute; + +/// Returns an HTTP provider for the given URL. +pub fn http_provider(url: &str) -> ReqwestProvider { + let url = url.parse().unwrap(); + let http = Http::::new(url); + ReqwestProvider::new(RpcClient::new(http, true)) +} diff --git a/bin/host/src/fetcher.rs b/bin/host/src/fetcher.rs index 4e5498cb..bd131db1 100644 --- a/bin/host/src/fetcher.rs +++ b/bin/host/src/fetcher.rs @@ -1,4 +1,4 @@ -//! Fetcher trait definition. +//! [Fetcher] trait definition. use kona_preimage::{HintRouter, PreimageFetcher}; diff --git a/bin/host/src/interop/cli.rs b/bin/host/src/interop/cli.rs index 6c14c210..66061f54 100644 --- a/bin/host/src/interop/cli.rs +++ b/bin/host/src/interop/cli.rs @@ -1,30 +1,15 @@ //! This module contains all CLI-specific code for the interop entrypoint. -use super::{ - local_kv::DEFAULT_CHAIN_ID, start_server, start_server_and_native_client, LocalKeyValueStore, -}; -use crate::{ - cli::{parse_b256, parse_bytes}, - eth::OnlineBlobProvider, - kv::{DiskKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore, SplitKeyValueStore}, -}; +use super::local_kv::DEFAULT_CHAIN_ID; +use crate::cli::{cli_styles, parse_b256, parse_bytes}; use alloy_primitives::{Bytes, B256}; -use alloy_provider::{Provider, ReqwestProvider}; use alloy_rlp::Decodable; -use alloy_rpc_client::RpcClient; -use alloy_transport_http::Http; use anyhow::{anyhow, Result}; -use clap::{ - builder::styling::{AnsiColor, Color, Style}, - Parser, -}; +use clap::Parser; use kona_proof_interop::PreState; use maili_genesis::RollupConfig; -use reqwest::Client; use serde::Serialize; -use std::{collections::HashMap, path::PathBuf, sync::Arc}; -use tokio::sync::RwLock; -use tracing::error; +use std::{collections::HashMap, path::PathBuf}; /// The host binary CLI application arguments. #[derive(Default, Parser, Serialize, Clone, Debug)] @@ -97,26 +82,6 @@ pub struct InteropHostCli { } impl InteropHostCli { - /// Runs the host binary in single-chain mode. - pub async fn run(self) -> Result<()> { - if self.server { - start_server(self).await?; - } else { - let status = match start_server_and_native_client(self).await { - Ok(status) => status, - Err(e) => { - error!(target: "kona_host", "Exited with an error: {:?}", e); - panic!("{e}"); - } - }; - - // Bubble up the exit status of the client program. - std::process::exit(status as i32); - } - - Ok(()) - } - /// Returns `true` if the host is running in offline mode. pub const fn is_offline(&self) -> bool { self.l1_node_address.is_none() && @@ -150,57 +115,6 @@ impl InteropHostCli { } } - /// Creates the providers associated with the [InteropHostCli] configuration. - /// - /// ## Returns - /// - A [ReqwestProvider] for the L1 node. - /// - An [OnlineBlobProvider] for the L1 beacon node. - /// - A hash map of chain ID -> [ReqwestProvider] for the L2 nodes. - pub async fn create_providers( - &self, - ) -> Result<(ReqwestProvider, OnlineBlobProvider, HashMap)> { - let l1_provider = Self::http_provider( - self.l1_node_address.as_ref().ok_or(anyhow!("Provider must be set"))?, - ); - - let blob_provider = OnlineBlobProvider::new_http( - self.l1_beacon_address.clone().ok_or(anyhow!("Beacon API URL must be set"))?, - ) - .await - .map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?; - - // Resolve all chain IDs to their corresponding providers. - let l2_node_addresses = - self.l2_node_addresses.as_ref().ok_or(anyhow!("L2 node addresses must be set"))?; - let mut l2_providers = HashMap::with_capacity(l2_node_addresses.len()); - for l2_node_address in l2_node_addresses { - let l2_provider = Self::http_provider(l2_node_address); - let chain_id = l2_provider.get_chain_id().await?; - - l2_providers.insert(chain_id, l2_provider); - } - - Ok((l1_provider, blob_provider, l2_providers)) - } - - /// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is - /// configured to be created. - pub fn construct_kv_store(&self) -> SharedKeyValueStore { - let local_kv_store = LocalKeyValueStore::new(self.clone()); - - let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir { - let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); - Arc::new(RwLock::new(split_kv_store)) - } else { - let mem_kv_store = MemoryKeyValueStore::new(); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); - Arc::new(RwLock::new(split_kv_store)) - }; - - kv_store - } - /// Reads the [RollupConfig]s from the file system and returns a map of L2 chain ID -> /// [RollupConfig]s. pub fn read_rollup_configs(&self) -> Result> { @@ -226,23 +140,4 @@ impl InteropHostCli { }, ) } - - /// Returns an HTTP provider for the given URL. - fn http_provider(url: &str) -> ReqwestProvider { - let url = url.parse().unwrap(); - let http = Http::::new(url); - ReqwestProvider::new(RpcClient::new(http, true)) - } -} - -/// Styles for the CLI application. -const fn cli_styles() -> clap::builder::Styles { - clap::builder::Styles::styled() - .usage(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Yellow)))) - .header(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Yellow)))) - .literal(Style::new().fg_color(Some(Color::Ansi(AnsiColor::Green)))) - .invalid(Style::new().bold().fg_color(Some(Color::Ansi(AnsiColor::Red)))) - .error(Style::new().bold().fg_color(Some(Color::Ansi(AnsiColor::Red)))) - .valid(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Green)))) - .placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White)))) } diff --git a/bin/host/src/interop/fetcher.rs b/bin/host/src/interop/fetcher.rs index 4db80f4e..aae2f933 100644 --- a/bin/host/src/interop/fetcher.rs +++ b/bin/host/src/interop/fetcher.rs @@ -2,7 +2,7 @@ //! preimages from a remote source serving the super-chain (interop) proof mode. use super::InteropHostCli; -use crate::{eth::OnlineBlobProvider, kv::KeyValueStore}; +use crate::eth::OnlineBlobProvider; use alloy_consensus::{Header, TxEnvelope, EMPTY_ROOT_HASH}; use alloy_eips::{ eip2718::Encodable2718, @@ -18,6 +18,7 @@ use alloy_rpc_types::{ }; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use kona_host::KeyValueStore; use kona_preimage::{ errors::{PreimageOracleError, PreimageOracleResult}, HintRouter, PreimageFetcher, PreimageKey, PreimageKeyType, diff --git a/bin/host/src/interop/local_kv.rs b/bin/host/src/interop/local_kv.rs index db36498c..d9bcdeb0 100644 --- a/bin/host/src/interop/local_kv.rs +++ b/bin/host/src/interop/local_kv.rs @@ -2,9 +2,9 @@ //! using the [InteropHostCli] config. use super::InteropHostCli; -use crate::kv::KeyValueStore; use alloy_primitives::{keccak256, B256}; use anyhow::Result; +use kona_host::KeyValueStore; use kona_preimage::PreimageKey; use kona_proof_interop::boot::{ L1_HEAD_KEY, L2_AGREED_PRE_STATE_KEY, L2_CHAIN_ID_KEY, L2_CLAIMED_POST_STATE_KEY, diff --git a/bin/host/src/interop/mod.rs b/bin/host/src/interop/mod.rs index 053eb264..0bba2e1c 100644 --- a/bin/host/src/interop/mod.rs +++ b/bin/host/src/interop/mod.rs @@ -1,15 +1,5 @@ //! This module contains the super-chain (interop) mode for the host. -use crate::{kv::KeyValueStore, server::PreimageServer}; -use anyhow::Result; -use kona_preimage::{ - BidirectionalChannel, HintReader, HintWriter, NativeChannel, OracleReader, OracleServer, -}; -use kona_std_fpvm::{FileChannel, FileDescriptor}; -use std::sync::Arc; -use tokio::{sync::RwLock, task}; -use tracing::info; - mod cli; pub use cli::InteropHostCli; @@ -19,100 +9,5 @@ pub use local_kv::LocalKeyValueStore; mod fetcher; pub use fetcher::InteropFetcher; -/// Starts the [PreimageServer] in the primary thread. In this mode, the host program has been -/// invoked by the Fault Proof VM and the client program is running in the parent process. -pub async fn start_server(cfg: InteropHostCli) -> Result<()> { - let (preimage_chan, hint_chan) = ( - FileChannel::new(FileDescriptor::PreimageRead, FileDescriptor::PreimageWrite), - FileChannel::new(FileDescriptor::HintRead, FileDescriptor::HintWrite), - ); - let oracle_server = OracleServer::new(preimage_chan); - let hint_reader = HintReader::new(hint_chan); - let kv_store = cfg.construct_kv_store(); - let fetcher = if !cfg.is_offline() { - let (l1_provider, blob_provider, l2_providers) = cfg.create_providers().await?; - Some(Arc::new(RwLock::new(InteropFetcher::new( - cfg, - kv_store.clone(), - l1_provider, - blob_provider, - l2_providers, - )))) - } else { - None - }; - - // Start the server and wait for it to complete. - info!("Starting preimage server."); - PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher).start().await?; - info!("Preimage server has exited."); - - Ok(()) -} - -/// Starts the [PreimageServer] and the client program in separate threads. The client program is -/// ran natively in this mode. -/// -/// ## Takes -/// - `cfg`: The host configuration. -/// -/// ## Returns -/// - `Ok(exit_code)` if the client program exits successfully. -/// - `Err(_)` if the client program failed to execute, was killed by a signal, or the host program -/// exited first. -pub async fn start_server_and_native_client(cfg: InteropHostCli) -> Result { - let hint_chan = BidirectionalChannel::new()?; - let preimage_chan = BidirectionalChannel::new()?; - let kv_store = cfg.construct_kv_store(); - let fetcher = if !cfg.is_offline() { - let (l1_provider, blob_provider, l2_providers) = cfg.create_providers().await?; - Some(Arc::new(RwLock::new(InteropFetcher::new( - cfg, - kv_store.clone(), - l1_provider, - blob_provider, - l2_providers, - )))) - } else { - None - }; - - // Create the server and start it. - let server_task = task::spawn(start_native_preimage_server( - kv_store, - fetcher, - hint_chan.host, - preimage_chan.host, - )); - - // Start the client program in a separate child process. - let program_task = task::spawn(kona_client::interop::run( - OracleReader::new(preimage_chan.client), - HintWriter::new(hint_chan.client), - None, - )); - - // Execute both tasks and wait for them to complete. - info!("Starting preimage server and client program."); - let (_, client_result) = tokio::try_join!(server_task, program_task,)?; - info!(target: "kona_host", "Preimage server and client program have joined."); - - Ok(client_result.is_err() as i32) -} - -/// Starts the preimage server in a separate thread. The client program is ran natively in this -/// mode. -pub async fn start_native_preimage_server( - kv_store: Arc>, - fetcher: Option>>>, - hint_chan: NativeChannel, - preimage_chan: NativeChannel, -) -> Result<()> -where - KV: KeyValueStore + Send + Sync + ?Sized + 'static, -{ - let hint_reader = HintReader::new(hint_chan); - let oracle_server = OracleServer::new(preimage_chan); - - PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher).start().await -} +mod orchestrator; +pub use orchestrator::InteropProviders; diff --git a/bin/host/src/interop/orchestrator.rs b/bin/host/src/interop/orchestrator.rs new file mode 100644 index 00000000..3e9d0c4d --- /dev/null +++ b/bin/host/src/interop/orchestrator.rs @@ -0,0 +1,105 @@ +//! [SingleChainHostCli]'s [HostOrchestrator] + [DetachedHostOrchestrator] implementations. + +use super::{InteropFetcher, InteropHostCli, LocalKeyValueStore}; +use crate::eth::{http_provider, OnlineBlobProvider}; +use alloy_provider::{Provider, ReqwestProvider}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use kona_host::{ + DetachedHostOrchestrator, DiskKeyValueStore, Fetcher, HostOrchestrator, MemoryKeyValueStore, + SharedKeyValueStore, SplitKeyValueStore, +}; +use kona_preimage::{HintWriter, NativeChannel, OracleReader}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; + +/// The providers required for the single chain host. +#[derive(Debug)] +pub struct InteropProviders { + /// The L1 EL provider. + l1_provider: ReqwestProvider, + /// The L1 beacon node provider. + blob_provider: OnlineBlobProvider, + /// The L2 EL providers, keyed by chain ID. + l2_providers: HashMap, +} + +#[async_trait] +impl HostOrchestrator for InteropHostCli { + type Providers = InteropProviders; + + async fn create_providers(&self) -> Result> { + if self.is_offline() { + return Ok(None); + } + + let l1_provider = + http_provider(self.l1_node_address.as_ref().ok_or(anyhow!("Provider must be set"))?); + + let blob_provider = OnlineBlobProvider::new_http( + self.l1_beacon_address.clone().ok_or(anyhow!("Beacon API URL must be set"))?, + ) + .await + .map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?; + + // Resolve all chain IDs to their corresponding providers. + let l2_node_addresses = + self.l2_node_addresses.as_ref().ok_or(anyhow!("L2 node addresses must be set"))?; + let mut l2_providers = HashMap::with_capacity(l2_node_addresses.len()); + for l2_node_address in l2_node_addresses { + let l2_provider = http_provider(l2_node_address); + let chain_id = l2_provider.get_chain_id().await?; + + l2_providers.insert(chain_id, l2_provider); + } + + Ok(Some(InteropProviders { l1_provider, blob_provider, l2_providers })) + } + + fn create_fetcher( + &self, + providers: Option, + kv_store: SharedKeyValueStore, + ) -> Option>> { + providers.map(|providers| { + // TODO: Don't pass the whole cfg to the interop fetcher. + Arc::new(RwLock::new(InteropFetcher::new( + self.clone(), + kv_store, + providers.l1_provider, + providers.blob_provider, + providers.l2_providers, + ))) + }) + } + + fn create_key_value_store(&self) -> Result { + let local_kv_store = LocalKeyValueStore::new(self.clone()); + + let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir { + let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); + let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); + Arc::new(RwLock::new(split_kv_store)) + } else { + let mem_kv_store = MemoryKeyValueStore::new(); + let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); + Arc::new(RwLock::new(split_kv_store)) + }; + + Ok(kv_store) + } + + async fn run_client_native( + hint_reader: HintWriter, + oracle_reader: OracleReader, + ) -> Result<()> { + kona_client::interop::run(oracle_reader, hint_reader, None).await.map_err(Into::into) + } +} + +#[async_trait] +impl DetachedHostOrchestrator for InteropHostCli { + fn is_detached(&self) -> bool { + self.server + } +} diff --git a/bin/host/src/lib.rs b/bin/host/src/lib.rs index a0c9a40e..1694e0cc 100644 --- a/bin/host/src/lib.rs +++ b/bin/host/src/lib.rs @@ -1,15 +1,21 @@ #![doc = include_str!("../README.md")] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] -pub mod cli; -pub use cli::{init_tracing_subscriber, HostCli}; +mod orchestrator; +pub use orchestrator::{DetachedHostOrchestrator, HostOrchestrator}; -pub mod interop; -pub mod single; +mod fetcher; +pub use fetcher::Fetcher; -pub mod eth; -pub mod fetcher; -pub mod kv; -pub mod preimage; -pub mod server; +mod kv; +pub use kv::{ + DiskKeyValueStore, KeyValueStore, MemoryKeyValueStore, SharedKeyValueStore, SplitKeyValueStore, +}; + +mod preimage; +pub use preimage::{ + OfflineHintRouter, OfflinePreimageFetcher, OnlineHintRouter, OnlinePreimageFetcher, +}; + +mod server; +pub use server::PreimageServer; diff --git a/bin/host/src/main.rs b/bin/host/src/main.rs index 8354d440..64f52fe6 100644 --- a/bin/host/src/main.rs +++ b/bin/host/src/main.rs @@ -1,10 +1,20 @@ //! Main entrypoint for the host binary. +#![warn(missing_debug_implementations, missing_docs, unreachable_pub, rustdoc::all)] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +use crate::cli::{init_tracing_subscriber, HostCli, HostMode}; use anyhow::Result; use clap::Parser; -use kona_host::{cli::HostMode, init_tracing_subscriber, HostCli}; +use kona_host::DetachedHostOrchestrator; use tracing::info; +pub mod cli; +pub mod eth; +pub mod interop; +pub mod single; + #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let cfg = HostCli::parse(); diff --git a/bin/host/src/orchestrator.rs b/bin/host/src/orchestrator.rs new file mode 100644 index 00000000..0cf478db --- /dev/null +++ b/bin/host/src/orchestrator.rs @@ -0,0 +1,132 @@ +//! Contains the [HostOrchestrator] trait, which defines entry points for the host to run a given +//! module. + +use crate::{Fetcher, PreimageServer, SharedKeyValueStore}; +use anyhow::Result; +use async_trait::async_trait; +use kona_preimage::{ + BidirectionalChannel, HintReader, HintWriter, NativeChannel, OracleReader, OracleServer, +}; +use kona_std_fpvm::{FileChannel, FileDescriptor}; +use std::sync::Arc; +use tokio::{sync::RwLock, task}; + +/// The host<->client communication channels. The client channels are optional, as the client may +/// not be running in the same process as the host. +#[derive(Debug)] +struct HostComms { + /// The host<->client hint channel. + pub hint: BidirectionalChannel, + /// The host<->client preimage channel. + pub preimage: BidirectionalChannel, +} + +/// The host->client communication channels when running in detached mode. The client channels are +/// held in a separate process. +#[derive(Debug)] +struct DetachedHostComms { + /// The host->client hint channel. + pub hint: FileChannel, + /// The host->client preimage channel. + pub preimage: FileChannel, +} + +/// The orchestrator is responsible for starting the host and client program, and managing the +/// communication between them. It is the entry point for the host to run a given module. +/// +/// This trait is specific to running both the host and client program in-process. For detached +/// mode, see [DetachedHostOrchestrator]. +#[async_trait] +pub trait HostOrchestrator { + /// A collection of the providers that the host can use to reference remote resources. + type Providers; + + /// Instantiates the providers for the host's fetcher. + async fn create_providers(&self) -> Result>; + + /// Constructs the [KeyValueStore] for the host. + /// + /// [KeyValueStore]: crate::KeyValueStore + fn create_key_value_store(&self) -> Result; + + /// Creates a [Fetcher] for the host program's preimage server. + fn create_fetcher( + &self, + providers: Option, + kv_store: SharedKeyValueStore, + ) -> Option>>; + + /// Runs the client program natively and returns the exit code. + async fn run_client_native( + hint_reader: HintWriter, + oracle_reader: OracleReader, + ) -> Result<()>; + + /// Starts the host and client program in-process. + async fn start(&self) -> Result<()> { + let comms = HostComms { + hint: BidirectionalChannel::new()?, + preimage: BidirectionalChannel::new()?, + }; + let kv_store = self.create_key_value_store()?; + let providers = self.create_providers().await?; + let fetcher = self.create_fetcher(providers, kv_store.clone()); + + let server_task = task::spawn( + PreimageServer::new( + OracleServer::new(comms.preimage.host), + HintReader::new(comms.hint.host), + kv_store, + fetcher, + ) + .start(), + ); + let client_task = task::spawn(Self::run_client_native( + HintWriter::new(comms.hint.client), + OracleReader::new(comms.preimage.client), + )); + + let (_, client_result) = tokio::try_join!(server_task, client_task)?; + + // Bubble up the exit status of the client program. + std::process::exit(client_result.is_err() as i32); + } +} + +/// The orchestrator for starting the host in detached mode, with the client program running in a +/// separate process. +#[async_trait] +pub trait DetachedHostOrchestrator: HostOrchestrator { + /// Returns whether the host is running in detached mode. + fn is_detached(&self) -> bool; + + /// Starts the host in detached mode, with the client program running in a separate process. + async fn run_detached(&self) -> Result<()> { + let comms = DetachedHostComms { + hint: FileChannel::new(FileDescriptor::HintRead, FileDescriptor::HintWrite), + preimage: FileChannel::new(FileDescriptor::PreimageRead, FileDescriptor::PreimageWrite), + }; + let kv_store = self.create_key_value_store()?; + let providers = self.create_providers().await?; + let fetcher = self.create_fetcher(providers, kv_store.clone()); + + PreimageServer::new( + OracleServer::new(comms.preimage), + HintReader::new(comms.hint), + kv_store, + fetcher, + ) + .start() + .await + } + + /// Override for [HostOrchestrator::start] that starts the host in detached mode, + /// if [DetachedHostOrchestrator::is_detached] returns `true`. + async fn run(&self) -> Result<()> { + if self.is_detached() { + self.run_detached().await + } else { + HostOrchestrator::start(self).await + } + } +} diff --git a/bin/host/src/server.rs b/bin/host/src/server.rs index 089d2714..9958c7a0 100644 --- a/bin/host/src/server.rs +++ b/bin/host/src/server.rs @@ -66,7 +66,8 @@ where )); let hint_router = spawn(Self::start_hint_router(self.hint_reader, self.fetcher)); - // Spawn tasks for the futures and wait for them to complete. + // Spawn tasks for the futures and wait for them to complete. If one of the tasks closes + // before the other, cancel the other task. tokio::select! { s = server => s.map_err(|e| anyhow!(e))?, h = hint_router => h.map_err(|e| anyhow!(e))?, @@ -87,6 +88,8 @@ where P: PreimageOracleServer, { loop { + // Serve the next preimage request. This `await` will yield to the runtime + // if no progress can be made. match server.next_preimage_request(fetcher).await { Ok(_) => continue, Err(PreimageOracleError::IOError(_)) => return Ok(()), @@ -98,7 +101,7 @@ where } } - info!("Starting oracle server"); + info!(target: "host-server", "Starting oracle server"); if let Some(fetcher) = fetcher.as_ref() { do_loop(&OnlinePreimageFetcher::new(Arc::clone(fetcher)), &oracle_server).await } else { @@ -116,6 +119,8 @@ where H: HintReaderServer, { loop { + // Route the next hint. This `await` will yield to the runtime if no progress can be + // made. match server.next_hint(router).await { Ok(_) => continue, Err(PreimageOracleError::IOError(_)) => return Ok(()), @@ -127,7 +132,7 @@ where } } - info!("Starting hint router"); + info!(target: "host-server", "Starting hint router"); if let Some(fetcher) = fetcher.as_ref() { do_loop(&OnlineHintRouter::new(Arc::clone(fetcher)), &hint_reader).await } else { diff --git a/bin/host/src/single/cli.rs b/bin/host/src/single/cli.rs index 97081d50..42dc6122 100644 --- a/bin/host/src/single/cli.rs +++ b/bin/host/src/single/cli.rs @@ -1,26 +1,12 @@ //! This module contains all CLI-specific code for the single chain entrypoint. -use super::{start_server, start_server_and_native_client, LocalKeyValueStore}; -use crate::{ - cli::parse_b256, - eth::OnlineBlobProvider, - kv::{DiskKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore, SplitKeyValueStore}, -}; +use crate::cli::{cli_styles, parse_b256}; use alloy_primitives::B256; -use alloy_provider::ReqwestProvider; -use alloy_rpc_client::RpcClient; -use alloy_transport_http::Http; use anyhow::{anyhow, Result}; -use clap::{ - builder::styling::{AnsiColor, Color, Style}, - Parser, -}; +use clap::Parser; use maili_genesis::RollupConfig; -use reqwest::Client; use serde::Serialize; -use std::{path::PathBuf, sync::Arc}; -use tokio::sync::RwLock; -use tracing::error; +use std::path::PathBuf; /// The host binary CLI application arguments. #[derive(Default, Parser, Serialize, Clone, Debug)] @@ -106,26 +92,6 @@ pub struct SingleChainHostCli { } impl SingleChainHostCli { - /// Runs the host binary in single-chain mode. - pub async fn run(self) -> Result<()> { - if self.server { - start_server(self).await?; - } else { - let status = match start_server_and_native_client(self).await { - Ok(status) => status, - Err(e) => { - error!(target: "kona_host", "Exited with an error: {:?}", e); - panic!("{e}"); - } - }; - - // Bubble up the exit status of the client program. - std::process::exit(status as i32); - } - - Ok(()) - } - /// Returns `true` if the host is running in offline mode. pub const fn is_offline(&self) -> bool { self.l1_node_address.is_none() && @@ -133,55 +99,6 @@ impl SingleChainHostCli { self.l1_beacon_address.is_none() } - /// Returns an HTTP provider for the given URL. - fn http_provider(url: &str) -> ReqwestProvider { - let url = url.parse().unwrap(); - let http = Http::::new(url); - ReqwestProvider::new(RpcClient::new(http, true)) - } - - /// Creates the providers associated with the [SingleChainHostCli] configuration. - /// - /// ## Returns - /// - A [ReqwestProvider] for the L1 node. - /// - An [OnlineBlobProvider] for the L1 beacon node. - /// - A [ReqwestProvider] for the L2 node. - pub async fn create_providers( - &self, - ) -> Result<(ReqwestProvider, OnlineBlobProvider, ReqwestProvider)> { - let blob_provider = OnlineBlobProvider::new_http( - self.l1_beacon_address.clone().ok_or(anyhow!("Beacon API URL must be set"))?, - ) - .await - .map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?; - let l1_provider = Self::http_provider( - self.l1_node_address.as_ref().ok_or(anyhow!("Provider must be set"))?, - ); - let l2_provider = Self::http_provider( - self.l2_node_address.as_ref().ok_or(anyhow!("L2 node address must be set"))?, - ); - - Ok((l1_provider, blob_provider, l2_provider)) - } - - /// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is - /// configured to be created. - pub fn construct_kv_store(&self) -> SharedKeyValueStore { - let local_kv_store = LocalKeyValueStore::new(self.clone()); - - let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir { - let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); - Arc::new(RwLock::new(split_kv_store)) - } else { - let mem_kv_store = MemoryKeyValueStore::new(); - let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); - Arc::new(RwLock::new(split_kv_store)) - }; - - kv_store - } - /// Reads the [RollupConfig] from the file system and returns it as a string. pub fn read_rollup_config(&self) -> Result { let path = self.rollup_config_path.as_ref().ok_or_else(|| { @@ -200,18 +117,6 @@ impl SingleChainHostCli { } } -/// Styles for the CLI application. -const fn cli_styles() -> clap::builder::Styles { - clap::builder::Styles::styled() - .usage(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Yellow)))) - .header(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Yellow)))) - .literal(Style::new().fg_color(Some(Color::Ansi(AnsiColor::Green)))) - .invalid(Style::new().bold().fg_color(Some(Color::Ansi(AnsiColor::Red)))) - .error(Style::new().bold().fg_color(Some(Color::Ansi(AnsiColor::Red)))) - .valid(Style::new().bold().underline().fg_color(Some(Color::Ansi(AnsiColor::Green)))) - .placeholder(Style::new().fg_color(Some(Color::Ansi(AnsiColor::White)))) -} - #[cfg(test)] mod test { use crate::single::SingleChainHostCli; diff --git a/bin/host/src/single/fetcher.rs b/bin/host/src/single/fetcher.rs index 98b51c88..39e5f3ce 100644 --- a/bin/host/src/single/fetcher.rs +++ b/bin/host/src/single/fetcher.rs @@ -1,7 +1,7 @@ //! This module contains the [SingleChainFetcher] struct, which is responsible for fetching //! preimages from a remote source serving the single-chain proof mode. -use crate::{eth::OnlineBlobProvider, kv::KeyValueStore}; +use crate::eth::OnlineBlobProvider; use alloy_consensus::{Header, TxEnvelope, EMPTY_ROOT_HASH}; use alloy_eips::{ eip2718::Encodable2718, @@ -16,6 +16,7 @@ use alloy_rpc_types::{ Transaction, }; use anyhow::{anyhow, Result}; +use kona_host::KeyValueStore; use kona_preimage::{ errors::{PreimageOracleError, PreimageOracleResult}, HintRouter, PreimageFetcher, PreimageKey, PreimageKeyType, diff --git a/bin/host/src/single/local_kv.rs b/bin/host/src/single/local_kv.rs index d1b0de05..05b4e718 100644 --- a/bin/host/src/single/local_kv.rs +++ b/bin/host/src/single/local_kv.rs @@ -2,9 +2,9 @@ //! using the [SingleChainHostCli] config. use super::SingleChainHostCli; -use crate::kv::KeyValueStore; use alloy_primitives::B256; use anyhow::Result; +use kona_host::KeyValueStore; use kona_preimage::PreimageKey; use kona_proof::boot::{ L1_HEAD_KEY, L2_CHAIN_ID_KEY, L2_CLAIM_BLOCK_NUMBER_KEY, L2_CLAIM_KEY, L2_OUTPUT_ROOT_KEY, diff --git a/bin/host/src/single/mod.rs b/bin/host/src/single/mod.rs index 676b3c99..b363dd41 100644 --- a/bin/host/src/single/mod.rs +++ b/bin/host/src/single/mod.rs @@ -1,118 +1,13 @@ //! This module contains the single-chain mode for the host. -use crate::{kv::KeyValueStore, server::PreimageServer}; -use anyhow::Result; -use kona_preimage::{ - BidirectionalChannel, HintReader, HintWriter, NativeChannel, OracleReader, OracleServer, -}; -use kona_std_fpvm::{FileChannel, FileDescriptor}; -use std::sync::Arc; -use tokio::{sync::RwLock, task}; -use tracing::info; - mod cli; pub use cli::SingleChainHostCli; +mod orchestrator; +pub use orchestrator::SingleChainProviders; + mod local_kv; pub use local_kv::LocalKeyValueStore; mod fetcher; pub use fetcher::SingleChainFetcher; - -/// Starts the [PreimageServer] in the primary thread. In this mode, the host program has been -/// invoked by the Fault Proof VM and the client program is running in the parent process. -pub async fn start_server(cfg: SingleChainHostCli) -> Result<()> { - let (preimage_chan, hint_chan) = ( - FileChannel::new(FileDescriptor::PreimageRead, FileDescriptor::PreimageWrite), - FileChannel::new(FileDescriptor::HintRead, FileDescriptor::HintWrite), - ); - let oracle_server = OracleServer::new(preimage_chan); - let hint_reader = HintReader::new(hint_chan); - let kv_store = cfg.construct_kv_store(); - let fetcher = if !cfg.is_offline() { - let (l1_provider, blob_provider, l2_provider) = cfg.create_providers().await?; - Some(Arc::new(RwLock::new(SingleChainFetcher::new( - kv_store.clone(), - l1_provider, - blob_provider, - l2_provider, - cfg.agreed_l2_head_hash, - )))) - } else { - None - }; - - // Start the server and wait for it to complete. - info!("Starting preimage server."); - PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher).start().await?; - info!("Preimage server has exited."); - - Ok(()) -} - -/// Starts the [PreimageServer] and the client program in separate threads. The client program is -/// ran natively in this mode. -/// -/// ## Takes -/// - `cfg`: The host configuration. -/// -/// ## Returns -/// - `Ok(exit_code)` if the client program exits successfully. -/// - `Err(_)` if the client program failed to execute, was killed by a signal, or the host program -/// exited first. -pub async fn start_server_and_native_client(cfg: SingleChainHostCli) -> Result { - let hint_chan = BidirectionalChannel::new()?; - let preimage_chan = BidirectionalChannel::new()?; - let kv_store = cfg.construct_kv_store(); - let fetcher = if !cfg.is_offline() { - let (l1_provider, blob_provider, l2_provider) = cfg.create_providers().await?; - Some(Arc::new(RwLock::new(SingleChainFetcher::new( - kv_store.clone(), - l1_provider, - blob_provider, - l2_provider, - cfg.agreed_l2_head_hash, - )))) - } else { - None - }; - - // Create the server and start it. - let server_task = task::spawn(start_native_preimage_server( - kv_store, - fetcher, - hint_chan.host, - preimage_chan.host, - )); - - // Start the client program in a separate child process. - let program_task = task::spawn(kona_client::single::run( - OracleReader::new(preimage_chan.client), - HintWriter::new(hint_chan.client), - None, - )); - - // Execute both tasks and wait for them to complete. - info!("Starting preimage server and client program."); - let (_, client_result) = tokio::try_join!(server_task, program_task,)?; - info!(target: "kona_host", "Preimage server and client program have joined."); - - Ok(client_result.is_err() as i32) -} - -/// Starts the preimage server in a separate thread. The client program is ran natively in this -/// mode. -pub async fn start_native_preimage_server( - kv_store: Arc>, - fetcher: Option>>>, - hint_chan: NativeChannel, - preimage_chan: NativeChannel, -) -> Result<()> -where - KV: KeyValueStore + Send + Sync + ?Sized + 'static, -{ - let hint_reader = HintReader::new(hint_chan); - let oracle_server = OracleServer::new(preimage_chan); - - PreimageServer::new(oracle_server, hint_reader, kv_store, fetcher).start().await -} diff --git a/bin/host/src/single/orchestrator.rs b/bin/host/src/single/orchestrator.rs new file mode 100644 index 00000000..56e1841a --- /dev/null +++ b/bin/host/src/single/orchestrator.rs @@ -0,0 +1,95 @@ +//! [SingleChainHostCli]'s [HostOrchestrator] + [DetachedHostOrchestrator] implementations. + +use super::{LocalKeyValueStore, SingleChainFetcher, SingleChainHostCli}; +use crate::eth::{http_provider, OnlineBlobProvider}; +use alloy_provider::ReqwestProvider; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use kona_host::{ + DetachedHostOrchestrator, DiskKeyValueStore, Fetcher, HostOrchestrator, MemoryKeyValueStore, + SharedKeyValueStore, SplitKeyValueStore, +}; +use kona_preimage::{HintWriter, NativeChannel, OracleReader}; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// The providers required for the single chain host. +#[derive(Debug)] +pub struct SingleChainProviders { + /// The L1 EL provider. + l1_provider: ReqwestProvider, + /// The L1 beacon node provider. + blob_provider: OnlineBlobProvider, + /// The L2 EL provider. + l2_provider: ReqwestProvider, +} + +#[async_trait] +impl HostOrchestrator for SingleChainHostCli { + type Providers = SingleChainProviders; + + async fn create_providers(&self) -> Result> { + if self.is_offline() { + return Ok(None); + } + + let blob_provider = OnlineBlobProvider::new_http( + self.l1_beacon_address.clone().ok_or(anyhow!("Beacon API URL must be set"))?, + ) + .await + .map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?; + let l1_provider = + http_provider(self.l1_node_address.as_ref().ok_or(anyhow!("Provider must be set"))?); + let l2_provider = http_provider( + self.l2_node_address.as_ref().ok_or(anyhow!("L2 node address must be set"))?, + ); + + Ok(Some(SingleChainProviders { l1_provider, blob_provider, l2_provider })) + } + + fn create_fetcher( + &self, + providers: Option, + kv_store: SharedKeyValueStore, + ) -> Option>> { + providers.map(|providers| { + Arc::new(RwLock::new(SingleChainFetcher::new( + kv_store, + providers.l1_provider, + providers.blob_provider, + providers.l2_provider, + self.agreed_l2_head_hash, + ))) + }) + } + + fn create_key_value_store(&self) -> Result { + let local_kv_store = LocalKeyValueStore::new(self.clone()); + + let kv_store: SharedKeyValueStore = if let Some(ref data_dir) = self.data_dir { + let disk_kv_store = DiskKeyValueStore::new(data_dir.clone()); + let split_kv_store = SplitKeyValueStore::new(local_kv_store, disk_kv_store); + Arc::new(RwLock::new(split_kv_store)) + } else { + let mem_kv_store = MemoryKeyValueStore::new(); + let split_kv_store = SplitKeyValueStore::new(local_kv_store, mem_kv_store); + Arc::new(RwLock::new(split_kv_store)) + }; + + Ok(kv_store) + } + + async fn run_client_native( + hint_reader: HintWriter, + oracle_reader: OracleReader, + ) -> Result<()> { + kona_client::single::run(oracle_reader, hint_reader, None).await.map_err(Into::into) + } +} + +#[async_trait] +impl DetachedHostOrchestrator for SingleChainHostCli { + fn is_detached(&self) -> bool { + self.server + } +} diff --git a/build/asterisc/asterisc-repro.dockerfile b/build/asterisc/asterisc-repro.dockerfile index 38cc432a..1dd7e52f 100644 --- a/build/asterisc/asterisc-repro.dockerfile +++ b/build/asterisc/asterisc-repro.dockerfile @@ -46,7 +46,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends git # Build kona-client on the selected tag RUN git checkout $CLIENT_TAG && \ - cargo build -Zbuild-std=core,alloc --workspace --bin kona --locked --profile release-client-lto --exclude kona-host --exclude kona-derive-alloy && \ + cargo build -Zbuild-std=core,alloc --workspace --bin kona --locked --profile release-client-lto --exclude kona-host && \ mv ./target/riscv64imac-unknown-none-elf/release-client-lto/kona /kona-client-elf ################################################################