From 230bd9479852310f23839a8b41e52e58d1112d86 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Thu, 24 Oct 2024 18:38:14 +0300 Subject: [PATCH 1/8] Introduce two-step service run Add docs --- ethexe/cli/src/service.rs | 115 +++++++++++++++++++++++++------- ethexe/cli/src/tests.rs | 19 ++---- ethexe/observer/src/lib.rs | 2 +- ethexe/observer/src/observer.rs | 107 ++++++++++++++++++++++------- 4 files changed, 180 insertions(+), 63 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index a418816637b..22645d63231 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -32,7 +32,7 @@ use ethexe_common::{ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_ethereum::router::RouterQuery; use ethexe_network::NetworkReceiverEvent; -use ethexe_observer::{RequestBlockData, RequestEvent}; +use ethexe_observer::{EventsStreamProducer, RequestBlockData, RequestEvent}; use ethexe_processor::LocalOutcome; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; @@ -51,7 +51,9 @@ use utils::*; /// ethexe service. pub struct Service { db: Database, - observer: ethexe_observer::Observer, + // Option used to enable "taking" observer from it when calling + // Service methods by `self`. Contract: always `Some`. + observer: Option, query: ethexe_observer::Query, processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, @@ -184,7 +186,7 @@ impl Service { Ok(Self { db, network, - observer, + observer: Some(observer), query, processor, sequencer, @@ -221,7 +223,7 @@ impl Service { ) -> Self { Self { db, - observer, + observer: Some(observer), query, processor, signer, @@ -398,29 +400,45 @@ impl Service { } } - async fn run_inner(self) -> Result<()> { + #[cfg(test)] + pub async fn pending_run(self) -> ServicePendingRun { + ServicePendingRun::new(self).await + } + + pub async fn run(mut self) -> Result<()> { + let Some(observer) = self.observer.take() else { + unreachable!("Contract invalidation; qed.") + }; + + if let Some(metrics_service) = self.metrics_service.take() { + tokio::spawn(metrics_service.run( + observer.get_status_receiver(), + self.sequencer.as_mut().map(|s| s.get_status_receiver()), + )); + } + let events_stream_producer = observer.events_stream_producer(); + + self.run_inner(events_stream_producer).await.map_err(|err| { + log::error!("Service finished work with error: {:?}", err); + err + }) + } + + async fn run_inner(self, events_stream_producer: EventsStreamProducer) -> Result<()> { let Service { db, network, - mut observer, + mut sequencer, mut query, mut processor, - mut sequencer, - signer: _signer, mut validator, - metrics_service, rpc, block_time, + signer: _signer, + .. } = self; - if let Some(metrics_service) = metrics_service { - tokio::spawn(metrics_service.run( - observer.get_status_receiver(), - sequencer.as_mut().map(|s| s.get_status_receiver()), - )); - } - - let observer_events = observer.request_events(); + let observer_events = events_stream_producer.request_events(); futures::pin_mut!(observer_events); let (mut network_sender, mut network_receiver, mut network_handle) = @@ -540,13 +558,6 @@ impl Service { Ok(()) } - pub async fn run(self) -> Result<()> { - self.run_inner().await.map_err(|err| { - log::error!("Service finished work with error: {:?}", err); - err - }) - } - async fn post_process_commitments( code_commitments: Vec, block_commitments: Vec, @@ -783,6 +794,62 @@ impl Service { } } +/// The type was introduced as a solution to the issue#4099. +/// +/// Basically, it splits the events stream creation into two steps: +/// 1) blocks subscription and 2) actually obtaining events in the loop. +/// +/// Usually blocks subscription is done within `async_stream::stream!` in which +/// the events obtaining loop is actually defined. This is a default design and +/// it's too slow as subscription to blocks is scheduled when the async stream is +/// first time polled. +#[cfg(test)] +pub struct ServicePendingRun { + service: Service, + events_stream_producer: EventsStreamProducer, +} + +#[cfg(test)] +impl ServicePendingRun { + async fn new(mut service: Service) -> Self { + let Some(observer) = service.observer.take() else { + unreachable!("Contract invalidation; qed.") + }; + + if let Some(metrics_service) = service.metrics_service.take() { + tokio::spawn(metrics_service.run( + observer.get_status_receiver(), + service.sequencer.as_mut().map(|s| s.get_status_receiver()), + )); + } + + let events_stream_producer = observer + .events_stream_producer() + .with_blocks_subscribed_first() + .await; + + Self { + service, + events_stream_producer, + } + } + + pub async fn complete_run(self) -> Result<()> { + let Self { + service, + events_stream_producer, + } = self; + + service + .run_inner(events_stream_producer) + .await + .map_err(|err| { + log::error!("Service finished work with error: {:?}", err); + err + }) + } +} + mod utils { use super::*; diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index a15a9f1fda9..74f928715fc 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -496,10 +496,6 @@ async fn ping_reorg() { assert_eq!(res.program_id, ping_id); assert_eq!(res.reply_payload, b"PONG"); - // Await for service block with user reply handling - // TODO: this is for better logs reading only, should find a better solution #4099 - tokio::time::sleep(env.block_time).await; - log::info!("📗 Test after reverting to the program creation snapshot"); provider .anvil_revert(program_created_snapshot_id) @@ -541,10 +537,6 @@ async fn ping_reorg() { let res = send_message.wait_for().await.unwrap(); assert_eq!(res.program_id, ping_id); assert_eq!(res.reply_payload, b"PONG"); - - // Await for service block with user reply handling - // TODO: this is for better logs reading only, should find a better solution #4099 - tokio::time::sleep(Duration::from_secs(1)).await; } // Mine 150 blocks - send message - mine 150 blocks. @@ -862,7 +854,7 @@ mod utils { .expect("failed to create observer"); let (broadcaster, _events_stream) = { - let mut observer = observer.clone(); + let observer = observer.clone(); let (sender, mut receiver) = tokio::sync::broadcast::channel::(2048); let sender = Arc::new(Mutex::new(sender)); let cloned_sender = sender.clone(); @@ -870,7 +862,7 @@ mod utils { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let observer_events = observer.events_all(); + let observer_events = observer.events_stream_producer().events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); @@ -1331,12 +1323,9 @@ mod utils { None, None, ); - - let handle = task::spawn(service.run()); + let service_pending_run = service.pending_run().await; + let handle = task::spawn(service_pending_run.complete_run()); self.running_service_handle = Some(handle); - - // Sleep to wait for the new service to start - tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index 3a9e1a23e92..5a5524194eb 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -25,5 +25,5 @@ mod query; pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader}; pub use event::{BlockData, Event, RequestBlockData, RequestEvent, SimpleBlockData}; -pub use observer::{Observer, ObserverStatus}; +pub use observer::{EventsStreamProducer, Observer, ObserverStatus}; pub use query::Query; diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index f4070aef6fb..87f5c295c4f 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -5,7 +5,8 @@ use crate::{ use alloy::{ primitives::Address as AlloyAddress, providers::{Provider, ProviderBuilder, RootProvider}, - rpc::types::eth::{Filter, Topic}, + pubsub::Subscription, + rpc::types::eth::{Block, Filter, Topic}, transports::BoxTransport, }; use anyhow::{anyhow, Result}; @@ -74,18 +75,71 @@ impl Observer { update_fn(&mut self.status); let _ = self.status_sender.send_replace(self.status); } + pub fn provider(&self) -> &ObserverProvider { &self.provider } - pub fn events_all(&mut self) -> impl Stream + '_ { + /// Returns events stream producer, which produces a stream for polling events + /// in observed blocks. + pub fn events_stream_producer(self) -> EventsStreamProducer { + EventsStreamProducer::new(self) + } + + async fn subscribe_blocks(&self) -> Result> { + self.provider.subscribe_blocks().await.map_err(Into::into) + } +} + +/// Events stream producer. +/// +/// Produces events stream that yields obtained from the observed chain blocks. +pub struct EventsStreamProducer { + observer: Observer, + maybe_subscription: Option>, +} + +impl EventsStreamProducer { + /// Creates an events stream producer from the `observer`. + pub fn new(observer: Observer) -> Self { + Self { + observer, + maybe_subscription: None, + } + } + + /// Subscribes to blocks and stores the subscription to be used + /// when events stream is created. + /// + /// For more info read `ethexe_cli::service::ServicePendingRun` docs. + pub async fn with_blocks_subscribed_first(self) -> Self { + let subscription = self + .observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks"); + + Self { + observer: self.observer, + maybe_subscription: Some(subscription), + } + } + + /// Returs stream for polling events. + pub fn events_all(self) -> impl Stream + 'static { + let Self { + mut observer, + maybe_subscription, + } = self; async_stream::stream! { - let block_subscription = self - .provider - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); - let mut block_stream = block_subscription.into_stream(); + let blocks_subscription = match maybe_subscription { + Some(blocks_subscription) => blocks_subscription, + None => observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks") + }; + let mut block_stream = blocks_subscription.into_stream(); let mut futures = FuturesUnordered::new(); loop { @@ -103,7 +157,7 @@ impl Observer { let block_number = block.header.number; let block_timestamp = block.header.timestamp; - let events = match read_block_events(block_hash, &self.provider, self.router_address).await { + let events = match read_block_events(block_hash, &observer.provider, observer.router_address).await { Ok(events) => events, Err(err) => { log::error!("failed to read events: {err}"); @@ -118,7 +172,7 @@ impl Observer { if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { codes_len += 1; - let blob_reader = self.blob_reader.clone(); + let blob_reader = observer.blob_reader.clone(); let code_id = *code_id; let blob_tx_hash = *blob_tx_hash; @@ -136,7 +190,7 @@ impl Observer { } } - self.update_status(|status| { + observer.update_status(|status| { status.eth_block_number = block_number; if codes_len > 0 { status.last_router_state = block_number; @@ -168,14 +222,21 @@ impl Observer { } } - pub fn request_events(&mut self) -> impl Stream + '_ { + /// Returs stream for polling request events. + pub fn request_events(self) -> impl Stream + 'static { + let Self { + mut observer, + maybe_subscription, + } = self; async_stream::stream! { - let block_subscription = self - .provider - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); - let mut block_stream = block_subscription.into_stream(); + let blocks_subscription = match maybe_subscription { + Some(blocks_subscription) => blocks_subscription, + None => observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks") + }; + let mut block_stream = blocks_subscription.into_stream(); let mut futures = FuturesUnordered::new(); loop { @@ -193,7 +254,7 @@ impl Observer { let block_number = block.header.number; let block_timestamp = block.header.timestamp; - let events = match read_block_request_events(block_hash, &self.provider, self.router_address).await { + let events = match read_block_request_events(block_hash, &observer.provider, observer.router_address).await { Ok(events) => events, Err(err) => { log::error!("failed to read events: {err}"); @@ -209,7 +270,7 @@ impl Observer { if let BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { codes_len += 1; - let blob_reader = self.blob_reader.clone(); + let blob_reader = observer.blob_reader.clone(); let code_id = *code_id; let blob_tx_hash = *blob_tx_hash; @@ -227,7 +288,7 @@ impl Observer { } } - self.update_status(|status| { + observer.update_status(|status| { status.eth_block_number = block_number; if codes_len > 0 { status.last_router_state = block_number; @@ -563,11 +624,11 @@ mod tests { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let mut observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) + let observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) .await .expect("failed to create observer"); - let observer_events = observer.events_all(); + let observer_events = observer.events_stream_producer().events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); From 7cdd9af7980d314349d4bb233035e4f3c3e71cbd Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Mon, 28 Oct 2024 17:01:10 +0300 Subject: [PATCH 2/8] Add macro to avoid copy-paste --- ethexe/observer/src/observer.rs | 308 ++++++++++++-------------------- 1 file changed, 115 insertions(+), 193 deletions(-) diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 87f5c295c4f..a9d0330b093 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -91,6 +91,105 @@ impl Observer { } } +macro_rules! define_event_stream_method { + ( + $method_name:ident, + $read_events_fn:ident, + $block_event_type:ty, + $router_event_type:ty, + $block_data_type:ty, + $event_type:ty + ) => { + pub fn $method_name(self) -> impl Stream + 'static { + let Self { + mut observer, + maybe_subscription, + } = self; + async_stream::stream! { + let blocks_subscription = match maybe_subscription { + Some(subscription) => subscription, + None => observer + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks"), + }; + let mut block_stream = blocks_subscription.into_stream(); + let mut futures = FuturesUnordered::new(); + + loop { + tokio::select! { + block = block_stream.next() => { + let Some(block) = block else { + log::info!("Block stream ended"); + break; + }; + + log::trace!("Received block: {:?}", block.header.hash); + + let block_hash = (*block.header.hash).into(); + let parent_hash = (*block.header.parent_hash).into(); + let block_number = block.header.number; + let block_timestamp = block.header.timestamp; + + let events = match $read_events_fn(block_hash, &observer.provider, observer.router_address).await { + Ok(events) => events, + Err(err) => { + log::error!("failed to read events: {err}"); + continue; + } + }; + + let mut codes_len = 0; + + for event in events.iter() { + if let $block_event_type::Router($router_event_type::CodeValidationRequested { code_id, blob_tx_hash }) = event { + codes_len += 1; + + let blob_reader = observer.blob_reader.clone(); + let code_id = *code_id; + let blob_tx_hash = *blob_tx_hash; + + futures.push(async move { + let attempts = Some(3); + read_code_from_tx_hash(blob_reader, code_id, blob_tx_hash, attempts).await + }); + } + } + + observer.update_status(|status| { + status.eth_block_number = block_number; + if codes_len > 0 { + status.last_router_state = block_number; + } + status.pending_upload_code = codes_len as u64; + }); + + let block_data = $block_data_type { + hash: block_hash, + header: BlockHeader { + height: block_number as u32, + timestamp: block_timestamp, + parent_hash, + }, + events, + }; + + yield $event_type::Block(block_data); + }, + future = futures.next(), if !futures.is_empty() => { + match future { + Some(Ok((code_id, code))) => yield $event_type::CodeLoaded { code_id, code }, + Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), + None => continue, + } + } + }; + } + } + } + } +} + /// Events stream producer. /// /// Produces events stream that yields obtained from the observed chain blocks. @@ -125,200 +224,23 @@ impl EventsStreamProducer { } } - /// Returs stream for polling events. - pub fn events_all(self) -> impl Stream + 'static { - let Self { - mut observer, - maybe_subscription, - } = self; - async_stream::stream! { - let blocks_subscription = match maybe_subscription { - Some(blocks_subscription) => blocks_subscription, - None => observer - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks") - }; - let mut block_stream = blocks_subscription.into_stream(); - let mut futures = FuturesUnordered::new(); - - loop { - tokio::select! { - block = block_stream.next() => { - let Some(block) = block else { - log::info!("Block stream ended"); - break; - }; - - log::trace!("Received block: {:?}", block.header.hash); - - let block_hash = (*block.header.hash).into(); - let parent_hash = (*block.header.parent_hash).into(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - - let events = match read_block_events(block_hash, &observer.provider, observer.router_address).await { - Ok(events) => events, - Err(err) => { - log::error!("failed to read events: {err}"); - continue; - } - }; - - let mut codes_len = 0; - - // Create futures to load codes - for event in events.iter() { - if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { - codes_len += 1; - - let blob_reader = observer.blob_reader.clone(); - - let code_id = *code_id; - let blob_tx_hash = *blob_tx_hash; - - futures.push(async move { - let attempts = Some(3); - - read_code_from_tx_hash( - blob_reader, - code_id, - blob_tx_hash, - attempts, - ).await - }); - } - } - - observer.update_status(|status| { - status.eth_block_number = block_number; - if codes_len > 0 { - status.last_router_state = block_number; - } - status.pending_upload_code = codes_len as u64; - }); - - let block_data = BlockData { - hash: block_hash, - header: BlockHeader { - height: block_number as u32, - timestamp: block_timestamp, - parent_hash, - }, - events, - }; - - yield Event::Block(block_data); - }, - future = futures.next(), if !futures.is_empty() => { - match future { - Some(Ok((code_id, code))) => yield Event::CodeLoaded { code_id, code }, - Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), - None => continue, - } - } - }; - } - } - } - - /// Returs stream for polling request events. - pub fn request_events(self) -> impl Stream + 'static { - let Self { - mut observer, - maybe_subscription, - } = self; - async_stream::stream! { - let blocks_subscription = match maybe_subscription { - Some(blocks_subscription) => blocks_subscription, - None => observer - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks") - }; - let mut block_stream = blocks_subscription.into_stream(); - let mut futures = FuturesUnordered::new(); - - loop { - tokio::select! { - block = block_stream.next() => { - let Some(block) = block else { - log::info!("Block stream ended"); - break; - }; - - log::trace!("Received block: {:?}", block.header.hash); - - let block_hash = (*block.header.hash).into(); - let parent_hash = (*block.header.parent_hash).into(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - - let events = match read_block_request_events(block_hash, &observer.provider, observer.router_address).await { - Ok(events) => events, - Err(err) => { - log::error!("failed to read events: {err}"); - continue; - } - }; - - let mut codes_len = 0; - - // Create futures to load codes - // TODO (breathx): remove me from here mb - for event in events.iter() { - if let BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { - codes_len += 1; - - let blob_reader = observer.blob_reader.clone(); - - let code_id = *code_id; - let blob_tx_hash = *blob_tx_hash; - - futures.push(async move { - let attempts = Some(3); - - read_code_from_tx_hash( - blob_reader, - code_id, - blob_tx_hash, - attempts, - ).await - }); - } - } + define_event_stream_method!( + events_all, + read_block_events, + BlockEvent, + RouterEvent, + BlockData, + Event + ); - observer.update_status(|status| { - status.eth_block_number = block_number; - if codes_len > 0 { - status.last_router_state = block_number; - } - status.pending_upload_code = codes_len as u64; - }); - - let block_data = RequestBlockData { - hash: block_hash, - header: BlockHeader { - height: block_number as u32, - timestamp: block_timestamp, - parent_hash, - }, - events, - }; - - yield RequestEvent::Block(block_data); - }, - future = futures.next(), if !futures.is_empty() => { - match future { - Some(Ok((code_id, code))) => yield RequestEvent::CodeLoaded { code_id, code }, - Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), - None => continue, - } - } - }; - } - } - } + define_event_stream_method!( + request_events, + read_block_request_events, + BlockRequestEvent, + RouterRequestEvent, + RequestBlockData, + RequestEvent + ); } pub(crate) async fn read_code_from_tx_hash( From 6e450c46487c1b2a372002ad7b667cb973a38186 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 12:26:55 +0300 Subject: [PATCH 3/8] Add timer --- ethexe/cli/src/tests.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index b7559dffe31..05b88103026 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -1334,6 +1334,9 @@ mod utils { let service_pending_run = service.pending_run().await; let handle = task::spawn(service_pending_run.complete_run()); self.running_service_handle = Some(handle); + + // Sleep to wait for the new service to start + tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { From 3174f9439943c60e94389f3d3f08cb0865aa7d19 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 21:42:11 +0300 Subject: [PATCH 4/8] Trigger CI From a465d72aa3ceae68f3f5a8705bf56829f96ca095 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 21:54:31 +0300 Subject: [PATCH 5/8] Remove timer, set block time, increase attempts amount --- ethexe/cli/src/tests.rs | 8 ++++---- ethexe/ethereum/src/lib.rs | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 05b88103026..06fb53b9117 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -824,7 +824,10 @@ mod utils { (rpc_url, None) } Err(_) => { - let anvil = Anvil::new().try_spawn().unwrap(); + let anvil = Anvil::new() + .block_time(block_time.as_secs()) + .try_spawn() + .unwrap(); log::info!("📍 Anvil started at {}", anvil.ws_endpoint()); (anvil.ws_endpoint(), Some(anvil)) } @@ -1334,9 +1337,6 @@ mod utils { let service_pending_run = service.pending_run().await; let handle = task::spawn(service_pending_run.complete_run()); self.running_service_handle = Some(handle); - - // Sleep to wait for the new service to start - tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index 190fb01934e..5537926fc02 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -297,7 +297,9 @@ impl TryGetReceipt for PendingTransactio Err(err) => err, }; - for _ in 0..3 { + log::trace!("Failed to get transaction receipt for {tx_hash}. Retrying..."); + for n in 0..25 { + log::trace!("Attempt {n}. Error - {err}"); match err { PendingTransactionError::TransportError(RpcError::NullResp) => {} _ => break, From 49b8e2bd5c0b5ab6aea828be90be7e1d0d6cdd17 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 12 Nov 2024 22:11:08 +0300 Subject: [PATCH 6/8] Add TODO --- ethexe/observer/src/observer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index a9d0330b093..3eabe32ea9c 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -108,6 +108,7 @@ macro_rules! define_event_stream_method { async_stream::stream! { let blocks_subscription = match maybe_subscription { Some(subscription) => subscription, + // TODO #4335: always subscribe blocks when Observer is created. None => observer .subscribe_blocks() .await From fbcd57c28506ffe5a8da6d4a4175926e47c2b3ab Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Mon, 2 Dec 2024 13:11:20 +0300 Subject: [PATCH 7/8] Remove redundant, subscribe blocks on `Observer` creation --- ethexe/cli/src/service.rs | 105 ++++----------------- ethexe/cli/src/tests.rs | 15 ++- ethexe/ethereum/src/lib.rs | 2 +- ethexe/observer/src/lib.rs | 2 +- ethexe/observer/src/observer.rs | 162 ++++++++++++++------------------ 5 files changed, 99 insertions(+), 187 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 1956e1d87c9..0af490b653c 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -30,7 +30,7 @@ use ethexe_common::{ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_ethereum::{primitives::U256, router::RouterQuery}; use ethexe_network::{db_sync, NetworkReceiverEvent}; -use ethexe_observer::{EventsStreamProducer, RequestBlockData, RequestEvent}; +use ethexe_observer::{RequestBlockData, RequestEvent}; use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; @@ -49,9 +49,7 @@ use utils::*; /// ethexe service. pub struct Service { db: Database, - // Option used to enable "taking" observer from it when calling - // Service methods by `self`. Contract: always `Some`. - observer: Option, + observer: ethexe_observer::Observer, query: ethexe_observer::Query, router_query: RouterQuery, processor: ethexe_processor::Processor, @@ -210,7 +208,7 @@ impl Service { Ok(Self { db, network, - observer: Some(observer), + observer, query, router_query, processor, @@ -249,7 +247,7 @@ impl Service { ) -> Self { Self { db, - observer: Some(observer), + observer, query, router_query, processor, @@ -434,46 +432,37 @@ impl Service { } } - #[cfg(test)] - pub async fn pending_run(self) -> ServicePendingRun { - ServicePendingRun::new(self).await - } - - pub async fn run(mut self) -> Result<()> { - let Some(observer) = self.observer.take() else { - unreachable!("Contract invalidation; qed.") - }; - - if let Some(metrics_service) = self.metrics_service.take() { - tokio::spawn(metrics_service.run( - observer.get_status_receiver(), - self.sequencer.as_mut().map(|s| s.get_status_receiver()), - )); - } - let events_stream_producer = observer.events_stream_producer(); - - self.run_inner(events_stream_producer).await.map_err(|err| { + pub async fn run(self) -> Result<()> { + self.run_inner().await.map_err(|err| { log::error!("Service finished work with error: {:?}", err); err }) } - async fn run_inner(self, events_stream_producer: EventsStreamProducer) -> Result<()> { + async fn run_inner(self) -> Result<()> { let Service { db, network, - mut sequencer, + mut observer, mut query, mut router_query, mut processor, + mut sequencer, + signer: _signer, mut validator, + metrics_service, rpc, block_time, - signer: _signer, - .. } = self; - let observer_events = events_stream_producer.request_events(); + if let Some(metrics_service) = metrics_service { + tokio::spawn(metrics_service.run( + observer.get_status_receiver(), + sequencer.as_mut().map(|s| s.get_status_receiver()), + )); + } + + let observer_events = observer.request_events(); futures::pin_mut!(observer_events); let (mut network_sender, mut network_receiver, mut network_handle) = @@ -869,62 +858,6 @@ impl Service { } } -/// The type was introduced as a solution to the issue#4099. -/// -/// Basically, it splits the events stream creation into two steps: -/// 1) blocks subscription and 2) actually obtaining events in the loop. -/// -/// Usually blocks subscription is done within `async_stream::stream!` in which -/// the events obtaining loop is actually defined. This is a default design and -/// it's too slow as subscription to blocks is scheduled when the async stream is -/// first time polled. -#[cfg(test)] -pub struct ServicePendingRun { - service: Service, - events_stream_producer: EventsStreamProducer, -} - -#[cfg(test)] -impl ServicePendingRun { - async fn new(mut service: Service) -> Self { - let Some(observer) = service.observer.take() else { - unreachable!("Contract invalidation; qed.") - }; - - if let Some(metrics_service) = service.metrics_service.take() { - tokio::spawn(metrics_service.run( - observer.get_status_receiver(), - service.sequencer.as_mut().map(|s| s.get_status_receiver()), - )); - } - - let events_stream_producer = observer - .events_stream_producer() - .with_blocks_subscribed_first() - .await; - - Self { - service, - events_stream_producer, - } - } - - pub async fn complete_run(self) -> Result<()> { - let Self { - service, - events_stream_producer, - } = self; - - service - .run_inner(events_stream_producer) - .await - .map_err(|err| { - log::error!("Service finished work with error: {:?}", err); - err - }) - } -} - mod utils { use super::*; diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 0901cf12c68..dd200fcc545 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -825,10 +825,7 @@ mod utils { (rpc_url, None) } Err(_) => { - let anvil = Anvil::new() - .block_time(block_time.as_secs()) - .try_spawn() - .unwrap(); + let anvil = Anvil::new().try_spawn().unwrap(); log::info!("📍 Anvil started at {}", anvil.ws_endpoint()); (anvil.ws_endpoint(), Some(anvil)) } @@ -861,7 +858,7 @@ mod utils { .expect("failed to create observer"); let (broadcaster, _events_stream) = { - let observer = observer.clone(); + let mut observer = observer.clone(); let (sender, mut receiver) = tokio::sync::broadcast::channel::(2048); let sender = Arc::new(Mutex::new(sender)); let cloned_sender = sender.clone(); @@ -869,7 +866,7 @@ mod utils { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let observer_events = observer.events_stream_producer().events_all(); + let observer_events = observer.events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); @@ -1335,9 +1332,11 @@ mod utils { None, None, ); - let service_pending_run = service.pending_run().await; - let handle = task::spawn(service_pending_run.complete_run()); + let handle = task::spawn(service.run()); self.running_service_handle = Some(handle); + + // Sleep to wait for the new service to start + tokio::time::sleep(Duration::from_secs(1)).await; } pub async fn stop_service(&mut self) { diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index b9cca970643..76b268df6f3 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -301,7 +301,7 @@ impl TryGetReceipt for PendingTransactio }; log::trace!("Failed to get transaction receipt for {tx_hash}. Retrying..."); - for n in 0..25 { + for n in 0..3 { log::trace!("Attempt {n}. Error - {err}"); match err { PendingTransactionError::TransportError(RpcError::NullResp) => {} diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index 5a5524194eb..3a9e1a23e92 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -25,5 +25,5 @@ mod query; pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader}; pub use event::{BlockData, Event, RequestBlockData, RequestEvent, SimpleBlockData}; -pub use observer::{EventsStreamProducer, Observer, ObserverStatus}; +pub use observer::{Observer, ObserverStatus}; pub use query::Query; diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 4e8039e6ec0..3fa8d2dbbed 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -29,15 +29,22 @@ pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000; pub(crate) type ObserverProvider = RootProvider; -#[derive(Clone)] pub struct Observer { provider: ObserverProvider, router_address: AlloyAddress, + // Always `Some` + blocks_subscription: Option>, blob_reader: Arc, status_sender: watch::Sender, status: ObserverStatus, } +impl Clone for Observer { + fn clone(&self) -> Self { + self.clone_with_resubscribe() + } +} + #[derive(Debug, Clone, Copy, Default)] pub struct ObserverStatus { pub eth_block_number: u64, @@ -45,49 +52,6 @@ pub struct ObserverStatus { pub last_router_state: u64, } -impl Observer { - pub async fn new( - ethereum_rpc: &str, - router_address: Address, - blob_reader: Arc, - ) -> Result { - let (status_sender, _status_receiver) = watch::channel(ObserverStatus::default()); - Ok(Self { - provider: ProviderBuilder::new().on_builtin(ethereum_rpc).await?, - router_address: AlloyAddress::new(router_address.0), - blob_reader, - status: Default::default(), - status_sender, - }) - } - - pub fn get_status_receiver(&self) -> watch::Receiver { - self.status_sender.subscribe() - } - - fn update_status(&mut self, update_fn: F) - where - F: FnOnce(&mut ObserverStatus), - { - update_fn(&mut self.status); - let _ = self.status_sender.send_replace(self.status); - } - - pub fn provider(&self) -> &ObserverProvider { - &self.provider - } - - /// Returns events stream producer, which produces a stream for polling events - /// in observed blocks. - pub fn events_stream_producer(self) -> EventsStreamProducer { - EventsStreamProducer::new(self) - } - - async fn subscribe_blocks(&self) -> Result> { - self.provider.subscribe_blocks().await.map_err(Into::into) - } -} - macro_rules! define_event_stream_method { ( $method_name:ident, @@ -97,21 +61,12 @@ macro_rules! define_event_stream_method { $block_data_type:ty, $event_type:ty ) => { - pub fn $method_name(self) -> impl Stream + 'static { - let Self { - mut observer, - maybe_subscription, - } = self; + pub fn $method_name(&mut self) -> impl Stream + '_ { + let new_subscription = self.resubscribe_blocks(); + let old_subscription = self.blocks_subscription.take().expect("always some"); + self.blocks_subscription = Some(new_subscription); async_stream::stream! { - let blocks_subscription = match maybe_subscription { - Some(subscription) => subscription, - // TODO #4335: always subscribe blocks when Observer is created. - None => observer - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"), - }; - let mut block_stream = blocks_subscription.into_stream(); + let mut block_stream = old_subscription.into_stream(); let mut futures = FuturesUnordered::new(); loop { @@ -129,7 +84,7 @@ macro_rules! define_event_stream_method { let block_number = block.header.number; let block_timestamp = block.header.timestamp; - let events = match $read_events_fn(block_hash, &observer.provider, observer.router_address).await { + let events = match $read_events_fn(block_hash, &self.provider, self.router_address).await { Ok(events) => events, Err(err) => { log::error!("failed to read events: {err}"); @@ -143,7 +98,7 @@ macro_rules! define_event_stream_method { if let $block_event_type::Router($router_event_type::CodeValidationRequested { code_id, blob_tx_hash }) = event { codes_len += 1; - let blob_reader = observer.blob_reader.clone(); + let blob_reader = self.blob_reader.clone(); let code_id = *code_id; let blob_tx_hash = *blob_tx_hash; @@ -154,7 +109,7 @@ macro_rules! define_event_stream_method { } } - observer.update_status(|status| { + self.update_status(|status| { status.eth_block_number = block_number; if codes_len > 0 { status.last_router_state = block_number; @@ -188,38 +143,39 @@ macro_rules! define_event_stream_method { } } -/// Events stream producer. -/// -/// Produces events stream that yields obtained from the observed chain blocks. -pub struct EventsStreamProducer { - observer: Observer, - maybe_subscription: Option>, -} +impl Observer { + pub async fn new( + ethereum_rpc: &str, + router_address: Address, + blob_reader: Arc, + ) -> Result { + let (status_sender, _status_receiver) = watch::channel(ObserverStatus::default()); + let provider = ProviderBuilder::new().on_builtin(ethereum_rpc).await?; + let blocks_subscription = provider.subscribe_blocks().await?; + Ok(Self { + provider, + router_address: AlloyAddress::new(router_address.0), + blocks_subscription: Some(blocks_subscription), + blob_reader, + status: Default::default(), + status_sender, + }) + } -impl EventsStreamProducer { - /// Creates an events stream producer from the `observer`. - pub fn new(observer: Observer) -> Self { - Self { - observer, - maybe_subscription: None, - } + pub fn get_status_receiver(&self) -> watch::Receiver { + self.status_sender.subscribe() } - /// Subscribes to blocks and stores the subscription to be used - /// when events stream is created. - /// - /// For more info read `ethexe_cli::service::ServicePendingRun` docs. - pub async fn with_blocks_subscribed_first(self) -> Self { - let subscription = self - .observer - .subscribe_blocks() - .await - .expect("failed to subscribe to blocks"); + fn update_status(&mut self, update_fn: F) + where + F: FnOnce(&mut ObserverStatus), + { + update_fn(&mut self.status); + let _ = self.status_sender.send_replace(self.status); + } - Self { - observer: self.observer, - maybe_subscription: Some(subscription), - } + pub fn provider(&self) -> &ObserverProvider { + &self.provider } define_event_stream_method!( @@ -239,6 +195,30 @@ impl EventsStreamProducer { RequestBlockData, RequestEvent ); + + /// Clones the `Observer` with resubscribing to blocks. + /// + /// Resubscription here is the same as calling provider's `subscibe_blocks` + /// method from the sense of both approaches will result in receiving only new blocks. + /// All the previous blocks queued in the inner channel of the subscription won't be + /// accessible by the new subscription. + pub fn clone_with_resubscribe(&self) -> Self { + Self { + provider: self.provider.clone(), + router_address: self.router_address, + blocks_subscription: Some(self.resubscribe_blocks()), + blob_reader: self.blob_reader.clone(), + status_sender: self.status_sender.clone(), + status: self.status, + } + } + + fn resubscribe_blocks(&self) -> Subscription { + // `expect` is called to state the invariant` that `blocks_subscription` is always `Some`. + let subscription_ref = self.blocks_subscription.as_ref().expect("always some"); + + subscription_ref.resubscribe() + } } pub(crate) async fn read_code_from_tx_hash( @@ -544,11 +524,11 @@ mod tests { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let handle = task::spawn(async move { - let observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) + let mut observer = Observer::new(ðereum_rpc, router_address, cloned_blob_reader) .await .expect("failed to create observer"); - let observer_events = observer.events_stream_producer().events_all(); + let observer_events = observer.events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); From 1b6609feece42e4229b0d0e3feb879f49070a8f8 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Mon, 2 Dec 2024 13:13:49 +0300 Subject: [PATCH 8/8] clean-up --- ethexe/cli/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 0af490b653c..1000cfb0527 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -434,7 +434,7 @@ impl Service { pub async fn run(self) -> Result<()> { self.run_inner().await.map_err(|err| { - log::error!("Service finished work with error: {:?}", err); + log::error!("Service finished work with error: {err:?}"); err }) }