Skip to content

Commit

Permalink
add exponential backoff retries if the beacon node event stream is down
Browse files Browse the repository at this point in the history
  • Loading branch information
ralexstokes committed Oct 30, 2023
1 parent 82665f6 commit 1356d02
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 17 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mev-relay-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ futures = "0.3.21"
async-trait = "0.1.53"
parking_lot = "0.12.1"
pin-project = "1.0.12"
backoff = { version = "0.4.0", features = ["tokio"] }

thiserror = "1.0.30"
http = "0.2.7"
Expand Down
1 change: 1 addition & 0 deletions mev-relay-rs/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl Relay {

// TODO: build tip context and support reorgs...
pub fn on_payload_attributes(&self, event: PayloadAttributesEvent) -> Result<(), Error> {
trace!(?event, "processing payload attributes");
let proposer_public_key =
self.validator_registry.get_public_key(event.proposer_index).ok_or_else::<Error, _>(
|| RelayError::UnknownValidatorIndex(event.proposer_index).into(),
Expand Down
49 changes: 32 additions & 17 deletions mev-relay-rs/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::relay::Relay;
use backoff::ExponentialBackoff;
use beacon_api_client::{mainnet::Client, PayloadAttributesTopic};
use ethereum_consensus::{
crypto::SecretKey,
Expand Down Expand Up @@ -77,25 +78,39 @@ impl Service {
let consensus = tokio::spawn(async move {
let relay = relay_clone;

let mut stream = match beacon_node.get_events::<PayloadAttributesTopic>().await {
Ok(events) => events,
Err(err) => {
error!(%err, "could not open payload attributes stream");
return
}
};

while let Some(event) = stream.next().await {
match event {
Ok(event) => {
if let Err(err) = relay.on_payload_attributes(event.data) {
warn!(%err, "could not process payload attributes");
let result = backoff::future::retry::<(), (), _, _, _>(
ExponentialBackoff::default(),
|| async {
let retry = backoff::Error::transient(());
let mut stream = match beacon_node.get_events::<PayloadAttributesTopic>().await
{
Ok(stream) => stream,
Err(err) => {
error!(%err, "could not open payload attributes stream");
return Err(retry)
}
};

while let Some(event) = stream.next().await {
match event {
Ok(event) => {
if let Err(err) = relay.on_payload_attributes(event.data) {
warn!(%err, "could not process payload attributes");
continue
}
}
Err(err) => {
warn!(%err, "error reading payload attributes stream");
return Err(retry)
}
}
}
Err(err) => {
warn!(%err, "error reading payload attributes stream");
}
}
Err(retry)
},
)
.await;
if result.is_err() {
error!("failed to read from event stream");
}
});

Expand Down

0 comments on commit 1356d02

Please sign in to comment.