Skip to content

Commit

Permalink
add bundle stream
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarreif committed Nov 4, 2024
1 parent 25e1572 commit 32eb3bc
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 25 deletions.
8 changes: 6 additions & 2 deletions crates/astria-auctioneer/src/auction/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub(crate) struct Builder {
pub(crate) sequencer_key: SequencerKey,
/// The denomination of the fee asset used in the sequencer transactions
pub(crate) fee_asset_denomination: asset::Denom,
/// The chain ID used for sequencer transactions
pub(crate) sequencer_chain_id: String,
/// The rollup ID used for `RollupDataSubmission` with the auction result
pub(crate) rollup_id: RollupId,
}
Expand All @@ -51,6 +53,7 @@ impl Builder {
fee_asset_denomination,
rollup_id,
sequencer_key,
sequencer_chain_id,
} = self;

let (executed_block_tx, executed_block_rx) = oneshot::channel();
Expand All @@ -59,7 +62,7 @@ impl Builder {
// TODO: get the capacity from config or something instead of using a magic number
let (new_bundles_tx, new_bundles_rx) = mpsc::channel(16);

let driver = Auction {
let auction = Auction {
metrics,
shutdown_token,
sequencer_grpc_endpoint,
Expand All @@ -72,6 +75,7 @@ impl Builder {
latency_margin,
sequencer_key,
fee_asset_denomination,
sequencer_chain_id,
rollup_id,
};

Expand All @@ -82,7 +86,7 @@ impl Builder {
start_timer_tx: Some(block_commitment_tx),
abort_tx: Some(reorg_tx),
},
driver,
auction,
)
}
}
21 changes: 11 additions & 10 deletions crates/astria-auctioneer/src/auction/manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::collections::HashMap;

use astria_core::{
generated::bundle::v1alpha1::Bundle,
primitive::v1::{
asset,
RollupId,
},
use astria_core::primitive::v1::{
asset,
RollupId,
};
use astria_eyre::eyre::{
self,
Expand All @@ -19,6 +16,7 @@ use tokio_util::{
use tracing::instrument;

use super::{
Bundle,
Handle,
Id,
SequencerKey,
Expand Down Expand Up @@ -102,6 +100,7 @@ impl Manager {
auction_id,
sequencer_key: self.sequencer_key.clone(),
fee_asset_denomination: self.fee_asset_denomination.clone(),
sequencer_chain_id: self.sequencer_chain_id.clone(),
rollup_id: self.rollup_id,
}
.build();
Expand Down Expand Up @@ -138,10 +137,12 @@ impl Manager {
.wrap_err("failed to start processing bids")
}

pub(crate) fn try_send_bundle(&mut self, _auction_id: Id, _bundle: Bundle) -> eyre::Result<()> {
unimplemented!()
// try to get the handle for the appropriate auction
// try send into that auction
pub(crate) fn try_send_bundle(&mut self, auction_id: Id, bundle: Bundle) -> eyre::Result<()> {
self.auction_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.try_send_bundle(bundle)
.wrap_err("failed to add bundle to auction")
}

pub(crate) async fn join_next(&mut self) -> Option<(Id, eyre::Result<()>)> {
Expand Down
4 changes: 3 additions & 1 deletion crates/astria-auctioneer/src/auction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Handle {
Ok(())
}

pub(crate) fn send_bundle_timeout(&mut self, bundle: Bundle) -> eyre::Result<()> {
pub(crate) fn try_send_bundle(&mut self, bundle: Bundle) -> eyre::Result<()> {
const BUNDLE_TIMEOUT: Duration = Duration::from_millis(100);

self.new_bundles_tx
Expand Down Expand Up @@ -125,6 +125,8 @@ struct Auction {
sequencer_key: SequencerKey,
/// Fee asset for submitting transactions
fee_asset_denomination: asset::Denom,
/// The chain ID used for sequencer transactions
sequencer_chain_id: String,
/// Rollup ID to submit the auction result to
rollup_id: RollupId,
}
Expand Down
26 changes: 26 additions & 0 deletions crates/astria-auctioneer/src/bundle/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ use astria_core::generated::bundle::v1alpha1::{
};
use astria_eyre::eyre::{
self,
OptionExt,
WrapErr as _,
};
use axum::http::Uri;
use futures::{
Stream,
StreamExt,
};
use tonic::transport::Endpoint;
use tracing::{
warn,
Expand All @@ -21,6 +26,8 @@ use tracing::{
};
use tryhard::backoff_strategies::ExponentialBackoff;

use super::Bundle;

pub(crate) struct BundleClient {
inner: BundleServiceClient<tonic::transport::Channel>,
uri: Uri,
Expand Down Expand Up @@ -106,3 +113,22 @@ impl BundleStream {
})
}
}

impl Stream for BundleStream {
type Item = eyre::Result<Bundle>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let raw = futures::ready!(self.client.poll_next_unpin(cx))
.ok_or_eyre("stream has been closed")?
.wrap_err("received gRPC error")?
.bundle
.ok_or_eyre("bundle stream response did not contain bundle")?;

let bundle = Bundle::try_from_raw(raw).wrap_err("failed to parse raw Bundle")?;

std::task::Poll::Ready(Some(Ok(bundle)))
}
}
1 change: 1 addition & 0 deletions crates/astria-auctioneer/src/bundle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use astria_eyre::eyre::{
WrapErr as _,
};
use bytes::Bytes;
pub(crate) use client::BundleStream;
use prost::Message as _;

mod client;
Expand Down
38 changes: 26 additions & 12 deletions crates/astria-auctioneer/src/optimistic_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ use crate::{
executed_stream::ExecutedBlockStream,
optimistic_stream::OptimisticBlockStream,
},
bundle::{
Bundle,
BundleStream,
},
sequencer_grpc_client::SequencerGrpcClient,
sequencer_key::SequencerKey,
};
Expand Down Expand Up @@ -76,10 +80,13 @@ impl Startup {
.wrap_err("failed to initialize block commitment stream")?;

let (blocks_to_execute_handle, executed_blocks) =
ExecutedBlockStream::connect(rollup_id, rollup_grpc_endpoint)
ExecutedBlockStream::connect(rollup_id, rollup_grpc_endpoint.clone())
.await
.wrap_err("failed to initialize executed block stream")?;

let bundle_stream = BundleStream::connect(rollup_grpc_endpoint)
.await
.wrap_err("failed to initialize bundle stream")?;
// let bundle_stream = BundleServiceClient::new(bundle_service_grpc_url)
// .wrap_err("failed to initialize bundle service grpc client")?;

Expand Down Expand Up @@ -112,6 +119,7 @@ impl Startup {
block_commitments,
executed_blocks,
blocks_to_execute_handle,
bundle_stream,
auctions,
current_block,
})
Expand All @@ -125,6 +133,7 @@ pub(crate) struct Running {
block_commitments: BlockCommitmentStream,
executed_blocks: ExecutedBlockStream,
blocks_to_execute_handle: block::executed_stream::Handle,
bundle_stream: BundleStream,
auctions: auction::Manager,
current_block: block::Current,
}
Expand Down Expand Up @@ -163,17 +172,11 @@ impl Running {
self.executed_block_handler(executed_block).wrap_err("failed to handle executed block")?;
}

// 2. forward bundles from bundle stream into the correct auction fut
// - bundles will build up in the channel into the auction until the executed signal is
// sent to the auction fut. so if backpressure builds up here, i.e. bids arrive way
// before execution, we can decide how to react here.
// for example, we can drop all the bundles that are stuck in the channel and log a warning,
// or we can kill the auction for that given block
// Some(res) = bundle_stream.next() => {
// filter by auction id/block hash
// push into correct mpsc channel
// TODO: document how backpressure is handled here
// }
Some(res) = self.bundle_stream.next() => {
let bundle = res.wrap_err("failed to get bundle")?;

self.bundle_handler(bundle).wrap_err("failed to handle bundle")?;
}
}
}
};
Expand Down Expand Up @@ -254,6 +257,17 @@ impl Running {
Ok(())
}

#[instrument(skip(self), fields(auction.id = %base64(self.current_block.sequencer_block_hash())))]
fn bundle_handler(&mut self, bundle: Bundle) -> eyre::Result<()> {
let auction_id =
auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash());
self.auctions
.try_send_bundle(auction_id, bundle)
.wrap_err("failed to submit bundle to auction")?;

Ok(())
}

async fn shutdown(self) {
self.shutdown_token.cancel();
}
Expand Down

0 comments on commit 32eb3bc

Please sign in to comment.