Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ethexe): Subscribe blocks when Observer is created #4311

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
115 changes: 91 additions & 24 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ethexe_common::{
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::{primitives::U256, router::RouterQuery};
use ethexe_network::{db_sync, NetworkReceiverEvent};
use ethexe_observer::{RequestBlockData, RequestEvent};
use ethexe_observer::{EventsStreamProducer, RequestBlockData, RequestEvent};
use ethexe_processor::{LocalOutcome, ProcessorConfig};
use ethexe_sequencer::agro::AggregatedCommitments;
use ethexe_signer::{Digest, PublicKey, Signature, Signer};
Expand All @@ -51,7 +51,9 @@ use utils::*;
/// ethexe service.
pub struct Service {
db: Database,
observer: ethexe_observer::Observer,
// Option used to enable "taking" observer from it when calling
// Service methods by `self`. Contract: always `Some`.
observer: Option<ethexe_observer::Observer>,
query: ethexe_observer::Query,
router_query: RouterQuery,
processor: ethexe_processor::Processor,
Expand Down Expand Up @@ -201,7 +203,7 @@ impl Service {
Ok(Self {
db,
network,
observer,
observer: Some(observer),
query,
router_query,
processor,
Expand Down Expand Up @@ -240,7 +242,7 @@ impl Service {
) -> Self {
Self {
db,
observer,
observer: Some(observer),
query,
router_query,
processor,
Expand Down Expand Up @@ -418,30 +420,46 @@ impl Service {
}
}

async fn run_inner(self) -> Result<()> {
#[cfg(test)]
pub async fn pending_run(self) -> ServicePendingRun {
ServicePendingRun::new(self).await
}

pub async fn run(mut self) -> Result<()> {
let Some(observer) = self.observer.take() else {
unreachable!("Contract invalidation; qed.")
};

if let Some(metrics_service) = self.metrics_service.take() {
tokio::spawn(metrics_service.run(
observer.get_status_receiver(),
self.sequencer.as_mut().map(|s| s.get_status_receiver()),
));
}
let events_stream_producer = observer.events_stream_producer();

self.run_inner(events_stream_producer).await.map_err(|err| {
log::error!("Service finished work with error: {:?}", err);
techraed marked this conversation as resolved.
Show resolved Hide resolved
err
})
}

async fn run_inner(self, events_stream_producer: EventsStreamProducer) -> Result<()> {
let Service {
db,
network,
mut observer,
mut sequencer,
mut query,
mut router_query,
mut processor,
mut sequencer,
signer: _signer,
mut validator,
metrics_service,
rpc,
block_time,
signer: _signer,
StackOverflowExcept1on marked this conversation as resolved.
Show resolved Hide resolved
..
} = self;

if let Some(metrics_service) = metrics_service {
tokio::spawn(metrics_service.run(
observer.get_status_receiver(),
sequencer.as_mut().map(|s| s.get_status_receiver()),
));
}

let observer_events = observer.request_events();
let observer_events = events_stream_producer.request_events();
futures::pin_mut!(observer_events);

let (mut network_sender, mut network_receiver, mut network_handle) =
Expand Down Expand Up @@ -577,13 +595,6 @@ impl Service {
Ok(())
}

pub async fn run(self) -> Result<()> {
self.run_inner().await.map_err(|err| {
log::error!("Service finished work with error: {:?}", err);
err
})
}

async fn post_process_commitments(
code_commitments: Vec<CodeCommitment>,
block_commitments: Vec<BlockCommitment>,
Expand Down Expand Up @@ -844,6 +855,62 @@ impl Service {
}
}

/// The type was introduced as a solution to the issue#4099.
///
/// Basically, it splits the events stream creation into two steps:
/// 1) blocks subscription and 2) actually obtaining events in the loop.
///
/// Usually blocks subscription is done within `async_stream::stream!` in which
/// the events obtaining loop is actually defined. This is a default design and
/// it's too slow as subscription to blocks is scheduled when the async stream is
/// first time polled.
#[cfg(test)]
pub struct ServicePendingRun {
service: Service,
events_stream_producer: EventsStreamProducer,
}

#[cfg(test)]
impl ServicePendingRun {
async fn new(mut service: Service) -> Self {
let Some(observer) = service.observer.take() else {
unreachable!("Contract invalidation; qed.")
};

if let Some(metrics_service) = service.metrics_service.take() {
tokio::spawn(metrics_service.run(
observer.get_status_receiver(),
service.sequencer.as_mut().map(|s| s.get_status_receiver()),
));
}

let events_stream_producer = observer
.events_stream_producer()
.with_blocks_subscribed_first()
.await;

Self {
service,
events_stream_producer,
}
}

pub async fn complete_run(self) -> Result<()> {
let Self {
service,
events_stream_producer,
} = self;

service
.run_inner(events_stream_producer)
.await
.map_err(|err| {
log::error!("Service finished work with error: {:?}", err);
techraed marked this conversation as resolved.
Show resolved Hide resolved
err
})
}
}

mod utils {
use super::*;

Expand Down
24 changes: 8 additions & 16 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,6 @@ async fn ping_reorg() {
assert_eq!(res.program_id, ping_id);
assert_eq!(res.reply_payload, b"PONG");

// Await for service block with user reply handling
// TODO: this is for better logs reading only, should find a better solution #4099
tokio::time::sleep(env.block_time).await;

log::info!("📗 Test after reverting to the program creation snapshot");
provider
.anvil_revert(program_created_snapshot_id)
Expand Down Expand Up @@ -545,10 +541,6 @@ async fn ping_reorg() {
let res = send_message.wait_for().await.unwrap();
assert_eq!(res.program_id, ping_id);
assert_eq!(res.reply_payload, b"PONG");

// Await for service block with user reply handling
// TODO: this is for better logs reading only, should find a better solution #4099
tokio::time::sleep(Duration::from_secs(1)).await;
}

// Mine 150 blocks - send message - mine 150 blocks.
Expand Down Expand Up @@ -832,7 +824,10 @@ mod utils {
(rpc_url, None)
}
Err(_) => {
let anvil = Anvil::new().try_spawn().unwrap();
let anvil = Anvil::new()
.block_time(block_time.as_secs())
.try_spawn()
.unwrap();
log::info!("📍 Anvil started at {}", anvil.ws_endpoint());
(anvil.ws_endpoint(), Some(anvil))
}
Expand Down Expand Up @@ -865,15 +860,15 @@ mod utils {
.expect("failed to create observer");

let (broadcaster, _events_stream) = {
let mut observer = observer.clone();
let observer = observer.clone();
let (sender, mut receiver) = tokio::sync::broadcast::channel::<Event>(2048);
let sender = Arc::new(Mutex::new(sender));
let cloned_sender = sender.clone();

let (send_subscription_created, receive_subscription_created) =
oneshot::channel::<()>();
let handle = task::spawn(async move {
let observer_events = observer.events_all();
let observer_events = observer.events_stream_producer().events_all();
futures::pin_mut!(observer_events);

send_subscription_created.send(()).unwrap();
Expand Down Expand Up @@ -1339,12 +1334,9 @@ mod utils {
None,
None,
);

let handle = task::spawn(service.run());
let service_pending_run = service.pending_run().await;
let handle = task::spawn(service_pending_run.complete_run());
self.running_service_handle = Some(handle);

// Sleep to wait for the new service to start
tokio::time::sleep(Duration::from_secs(1)).await;
}

pub async fn stop_service(&mut self) {
Expand Down
4 changes: 3 additions & 1 deletion ethexe/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl<T: Transport + Clone, N: Network> TryGetReceipt<T, N> for PendingTransactio
Err(err) => err,
};

for _ in 0..3 {
log::trace!("Failed to get transaction receipt for {tx_hash}. Retrying...");
for n in 0..25 {
techraed marked this conversation as resolved.
Show resolved Hide resolved
log::trace!("Attempt {n}. Error - {err}");
match err {
PendingTransactionError::TransportError(RpcError::NullResp) => {}
_ => break,
Expand Down
2 changes: 1 addition & 1 deletion ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ mod query;

pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader};
pub use event::{BlockData, Event, RequestBlockData, RequestEvent, SimpleBlockData};
pub use observer::{Observer, ObserverStatus};
pub use observer::{EventsStreamProducer, Observer, ObserverStatus};
pub use query::Query;
Loading
Loading