diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 7c4231e0253..e4e4d8f8725 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -432,6 +432,13 @@ impl Service { } } + pub async fn run(self) -> Result<()> { + self.run_inner().await.map_err(|err| { + log::error!("Service finished work with error: {err:?}"); + err + }) + } + async fn run_inner(self) -> Result<()> { let Service { db, @@ -591,13 +598,6 @@ impl Service { Ok(()) } - pub async fn run(self) -> Result<()> { - self.run_inner().await.map_err(|err| { - log::error!("Service finished work with error: {:?}", err); - err - }) - } - async fn post_process_commitments( code_commitments: Vec, block_commitments: Vec, diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index e03239350cc..4c32678a2bd 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -501,10 +501,6 @@ async fn ping_reorg() { assert_eq!(res.program_id, ping_id); assert_eq!(res.reply_payload, b"PONG"); - // Await for service block with user reply handling - // TODO: this is for better logs reading only, should find a better solution #4099 - tokio::time::sleep(env.block_time).await; - log::info!("📗 Test after reverting to the program creation snapshot"); provider .anvil_revert(program_created_snapshot_id) @@ -546,10 +542,6 @@ async fn ping_reorg() { let res = send_message.wait_for().await.unwrap(); assert_eq!(res.program_id, ping_id); assert_eq!(res.reply_payload, b"PONG"); - - // Await for service block with user reply handling - // TODO: this is for better logs reading only, should find a better solution #4099 - tokio::time::sleep(Duration::from_secs(1)).await; } // Mine 150 blocks - send message - mine 150 blocks. @@ -1354,7 +1346,6 @@ mod utils { None, None, ); - let handle = task::spawn(service.run()); self.running_service_handle = Some(handle); diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index 3243dbffcee..76b268df6f3 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -300,7 +300,9 @@ impl TryGetReceipt for PendingTransactio Err(err) => err, }; - for _ in 0..3 { + log::trace!("Failed to get transaction receipt for {tx_hash}. Retrying..."); + for n in 0..3 { + log::trace!("Attempt {n}. Error - {err}"); match err { PendingTransactionError::TransportError(RpcError::NullResp) => {} _ => break, diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 910d068fcd0..3fa8d2dbbed 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -5,7 +5,8 @@ use crate::{ use alloy::{ primitives::Address as AlloyAddress, providers::{Provider, ProviderBuilder, RootProvider}, - rpc::types::eth::{Filter, Topic}, + pubsub::Subscription, + rpc::types::eth::{Block, Filter, Topic}, transports::BoxTransport, }; use anyhow::{anyhow, Result}; @@ -28,15 +29,22 @@ pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000; pub(crate) type ObserverProvider = RootProvider; -#[derive(Clone)] pub struct Observer { provider: ObserverProvider, router_address: AlloyAddress, + // Always `Some` + blocks_subscription: Option>, blob_reader: Arc, status_sender: watch::Sender, status: ObserverStatus, } +impl Clone for Observer { + fn clone(&self) -> Self { + self.clone_with_resubscribe() + } +} + #[derive(Debug, Clone, Copy, Default)] pub struct ObserverStatus { pub eth_block_number: u64, @@ -44,6 +52,97 @@ pub struct ObserverStatus { pub last_router_state: u64, } +macro_rules! define_event_stream_method { + ( + $method_name:ident, + $read_events_fn:ident, + $block_event_type:ty, + $router_event_type:ty, + $block_data_type:ty, + $event_type:ty + ) => { + pub fn $method_name(&mut self) -> impl Stream + '_ { + let new_subscription = self.resubscribe_blocks(); + let old_subscription = self.blocks_subscription.take().expect("always some"); + self.blocks_subscription = Some(new_subscription); + async_stream::stream! { + let mut block_stream = old_subscription.into_stream(); + let mut futures = FuturesUnordered::new(); + + loop { + tokio::select! { + block = block_stream.next() => { + let Some(block) = block else { + log::info!("Block stream ended"); + break; + }; + + log::trace!("Received block: {:?}", block.header.hash); + + let block_hash = (*block.header.hash).into(); + let parent_hash = (*block.header.parent_hash).into(); + let block_number = block.header.number; + let block_timestamp = block.header.timestamp; + + let events = match $read_events_fn(block_hash, &self.provider, self.router_address).await { + Ok(events) => events, + Err(err) => { + log::error!("failed to read events: {err}"); + continue; + } + }; + + let mut codes_len = 0; + + for event in events.iter() { + if let $block_event_type::Router($router_event_type::CodeValidationRequested { code_id, blob_tx_hash }) = event { + codes_len += 1; + + let blob_reader = self.blob_reader.clone(); + let code_id = *code_id; + let blob_tx_hash = *blob_tx_hash; + + futures.push(async move { + let attempts = Some(3); + read_code_from_tx_hash(blob_reader, code_id, blob_tx_hash, attempts).await + }); + } + } + + self.update_status(|status| { + status.eth_block_number = block_number; + if codes_len > 0 { + status.last_router_state = block_number; + } + status.pending_upload_code = codes_len as u64; + }); + + let block_data = $block_data_type { + hash: block_hash, + header: BlockHeader { + height: block_number as u32, + timestamp: block_timestamp, + parent_hash, + }, + events, + }; + + yield $event_type::Block(block_data); + }, + future = futures.next(), if !futures.is_empty() => { + match future { + Some(Ok((code_id, code))) => yield $event_type::CodeLoaded { code_id, code }, + Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), + None => continue, + } + } + }; + } + } + } + } +} + impl Observer { pub async fn new( ethereum_rpc: &str, @@ -51,9 +150,12 @@ impl Observer { blob_reader: Arc, ) -> Result { let (status_sender, _status_receiver) = watch::channel(ObserverStatus::default()); + let provider = ProviderBuilder::new().on_builtin(ethereum_rpc).await?; + let blocks_subscription = provider.subscribe_blocks().await?; Ok(Self { - provider: ProviderBuilder::new().on_builtin(ethereum_rpc).await?, + provider, router_address: AlloyAddress::new(router_address.0), + blocks_subscription: Some(blocks_subscription), blob_reader, status: Default::default(), status_sender, @@ -71,189 +173,51 @@ impl Observer { update_fn(&mut self.status); let _ = self.status_sender.send_replace(self.status); } + pub fn provider(&self) -> &ObserverProvider { &self.provider } - pub fn events_all(&mut self) -> impl Stream + '_ { - async_stream::stream! { - let block_subscription = self - .provider - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); - let mut block_stream = block_subscription.into_stream(); - let mut futures = FuturesUnordered::new(); - - loop { - tokio::select! { - block = block_stream.next() => { - let Some(block) = block else { - log::info!("Block stream ended"); - break; - }; - - log::trace!("Received block: {:?}", block.header.hash); - - let block_hash = (*block.header.hash).into(); - let parent_hash = (*block.header.parent_hash).into(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - - let events = match read_block_events(block_hash, &self.provider, self.router_address).await { - Ok(events) => events, - Err(err) => { - log::error!("failed to read events: {err}"); - continue; - } - }; - - let mut codes_len = 0; - - // Create futures to load codes - for event in events.iter() { - if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { - codes_len += 1; - - let blob_reader = self.blob_reader.clone(); - - let code_id = *code_id; - let blob_tx_hash = *blob_tx_hash; - - futures.push(async move { - let attempts = Some(3); + define_event_stream_method!( + events_all, + read_block_events, + BlockEvent, + RouterEvent, + BlockData, + Event + ); - read_code_from_tx_hash( - blob_reader, - code_id, - blob_tx_hash, - attempts, - ).await - }); - } - } + define_event_stream_method!( + request_events, + read_block_request_events, + BlockRequestEvent, + RouterRequestEvent, + RequestBlockData, + RequestEvent + ); - self.update_status(|status| { - status.eth_block_number = block_number; - if codes_len > 0 { - status.last_router_state = block_number; - } - status.pending_upload_code = codes_len as u64; - }); - - let block_data = BlockData { - hash: block_hash, - header: BlockHeader { - height: block_number as u32, - timestamp: block_timestamp, - parent_hash, - }, - events, - }; - - yield Event::Block(block_data); - }, - future = futures.next(), if !futures.is_empty() => { - match future { - Some(Ok((code_id, code))) => yield Event::CodeLoaded { code_id, code }, - Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), - None => continue, - } - } - }; - } + /// Clones the `Observer` with resubscribing to blocks. + /// + /// Resubscription here is the same as calling provider's `subscibe_blocks` + /// method from the sense of both approaches will result in receiving only new blocks. + /// All the previous blocks queued in the inner channel of the subscription won't be + /// accessible by the new subscription. + pub fn clone_with_resubscribe(&self) -> Self { + Self { + provider: self.provider.clone(), + router_address: self.router_address, + blocks_subscription: Some(self.resubscribe_blocks()), + blob_reader: self.blob_reader.clone(), + status_sender: self.status_sender.clone(), + status: self.status, } } - pub fn request_events(&mut self) -> impl Stream + '_ { - async_stream::stream! { - let block_subscription = self - .provider - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); - let mut block_stream = block_subscription.into_stream(); - let mut futures = FuturesUnordered::new(); - - loop { - tokio::select! { - block = block_stream.next() => { - let Some(block) = block else { - log::info!("Block stream ended"); - break; - }; - - log::trace!("Received block: {:?}", block.header.hash); - - let block_hash = (*block.header.hash).into(); - let parent_hash = (*block.header.parent_hash).into(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - - let events = match read_block_request_events(block_hash, &self.provider, self.router_address).await { - Ok(events) => events, - Err(err) => { - log::error!("failed to read events: {err}"); - continue; - } - }; - - let mut codes_len = 0; - - // Create futures to load codes - // TODO (breathx): remove me from here mb - for event in events.iter() { - if let BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { - codes_len += 1; + fn resubscribe_blocks(&self) -> Subscription { + // `expect` is called to state the invariant` that `blocks_subscription` is always `Some`. + let subscription_ref = self.blocks_subscription.as_ref().expect("always some"); - let blob_reader = self.blob_reader.clone(); - - let code_id = *code_id; - let blob_tx_hash = *blob_tx_hash; - - futures.push(async move { - let attempts = Some(3); - - read_code_from_tx_hash( - blob_reader, - code_id, - blob_tx_hash, - attempts, - ).await - }); - } - } - - self.update_status(|status| { - status.eth_block_number = block_number; - if codes_len > 0 { - status.last_router_state = block_number; - } - status.pending_upload_code = codes_len as u64; - }); - - let block_data = RequestBlockData { - hash: block_hash, - header: BlockHeader { - height: block_number as u32, - timestamp: block_timestamp, - parent_hash, - }, - events, - }; - - yield RequestEvent::Block(block_data); - }, - future = futures.next(), if !futures.is_empty() => { - match future { - Some(Ok((code_id, code))) => yield RequestEvent::CodeLoaded { code_id, code }, - Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), - None => continue, - } - } - }; - } - } + subscription_ref.resubscribe() } }