Skip to content

Commit

Permalink
add bundle stream
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarreif committed Nov 4, 2024
1 parent 25e1572 commit 45810bf
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 48 deletions.
10 changes: 7 additions & 3 deletions crates/astria-auctioneer/src/auction/allocation_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ impl FirstPrice {
}
}

pub(crate) fn bid(&mut self, _bid: Bundle) -> bool {
// save the bid if its higher than self.highest_bid
unimplemented!()
pub(crate) fn bid(&mut self, bundle: Bundle) -> bool {
if bundle.bid() > self.highest_bid.as_ref().map_or(0, |b| b.bid()) {
self.highest_bid = Some(bundle);
true
} else {
false
}
}

pub(crate) fn highest_bid(self) -> Option<Bundle> {
Expand Down
8 changes: 6 additions & 2 deletions crates/astria-auctioneer/src/auction/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub(crate) struct Builder {
pub(crate) sequencer_key: SequencerKey,
/// The denomination of the fee asset used in the sequencer transactions
pub(crate) fee_asset_denomination: asset::Denom,
/// The chain ID used for sequencer transactions
pub(crate) sequencer_chain_id: String,
/// The rollup ID used for `RollupDataSubmission` with the auction result
pub(crate) rollup_id: RollupId,
}
Expand All @@ -51,6 +53,7 @@ impl Builder {
fee_asset_denomination,
rollup_id,
sequencer_key,
sequencer_chain_id,
} = self;

let (executed_block_tx, executed_block_rx) = oneshot::channel();
Expand All @@ -59,7 +62,7 @@ impl Builder {
// TODO: get the capacity from config or something instead of using a magic number
let (new_bundles_tx, new_bundles_rx) = mpsc::channel(16);

let driver = Auction {
let auction = Auction {
metrics,
shutdown_token,
sequencer_grpc_endpoint,
Expand All @@ -72,6 +75,7 @@ impl Builder {
latency_margin,
sequencer_key,
fee_asset_denomination,
sequencer_chain_id,
rollup_id,
};

Expand All @@ -82,7 +86,7 @@ impl Builder {
start_timer_tx: Some(block_commitment_tx),
abort_tx: Some(reorg_tx),
},
driver,
auction,
)
}
}
21 changes: 11 additions & 10 deletions crates/astria-auctioneer/src/auction/manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::collections::HashMap;

use astria_core::{
generated::bundle::v1alpha1::Bundle,
primitive::v1::{
asset,
RollupId,
},
use astria_core::primitive::v1::{
asset,
RollupId,
};
use astria_eyre::eyre::{
self,
Expand All @@ -19,6 +16,7 @@ use tokio_util::{
use tracing::instrument;

use super::{
Bundle,
Handle,
Id,
SequencerKey,
Expand Down Expand Up @@ -102,6 +100,7 @@ impl Manager {
auction_id,
sequencer_key: self.sequencer_key.clone(),
fee_asset_denomination: self.fee_asset_denomination.clone(),
sequencer_chain_id: self.sequencer_chain_id.clone(),
rollup_id: self.rollup_id,
}
.build();
Expand Down Expand Up @@ -138,10 +137,12 @@ impl Manager {
.wrap_err("failed to start processing bids")
}

pub(crate) fn try_send_bundle(&mut self, _auction_id: Id, _bundle: Bundle) -> eyre::Result<()> {
unimplemented!()
// try to get the handle for the appropriate auction
// try send into that auction
pub(crate) fn try_send_bundle(&mut self, auction_id: Id, bundle: Bundle) -> eyre::Result<()> {
self.auction_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.try_send_bundle(bundle)
.wrap_err("failed to add bundle to auction")
}

pub(crate) async fn join_next(&mut self) -> Option<(Id, eyre::Result<()>)> {
Expand Down
18 changes: 11 additions & 7 deletions crates/astria-auctioneer/src/auction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use astria_eyre::eyre::{
OptionExt as _,
};
pub(crate) use builder::Builder;
use telemetry::display::base64;
use tokio::{
select,
sync::{
Expand All @@ -24,6 +25,7 @@ use tokio::{
},
};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::{
bundle::Bundle,
Expand Down Expand Up @@ -88,9 +90,7 @@ impl Handle {
Ok(())
}

pub(crate) fn send_bundle_timeout(&mut self, bundle: Bundle) -> eyre::Result<()> {
const BUNDLE_TIMEOUT: Duration = Duration::from_millis(100);

pub(crate) fn try_send_bundle(&mut self, bundle: Bundle) -> eyre::Result<()> {
self.new_bundles_tx
.try_send(bundle)
.wrap_err("bid channel full")?;
Expand Down Expand Up @@ -125,14 +125,16 @@ struct Auction {
sequencer_key: SequencerKey,
/// Fee asset for submitting transactions
fee_asset_denomination: asset::Denom,
/// The chain ID used for sequencer transactions
sequencer_chain_id: String,
/// Rollup ID to submit the auction result to
rollup_id: RollupId,
}

impl Auction {
pub(crate) async fn run(mut self) -> eyre::Result<()> {
let mut latency_margin_timer = None;
let allocation_rule = FirstPrice::new();
let mut allocation_rule = FirstPrice::new();
let mut auction_is_open = false;

let mut nonce_fetch: Option<tokio::task::JoinHandle<eyre::Result<u32>>> = None;
Expand Down Expand Up @@ -182,9 +184,11 @@ impl Auction {
}));
}

// TODO: new bundles from the bundle stream if auction exists?
// - add the bid to the auction if executed
// Some(bid) = &mut self.bids_rx, if auction_is_open() => {}
Some(bundle) = self.new_bundles_rx.recv(), if auction_is_open => {
if allocation_rule.bid(bundle.clone()) {
info!(auction.id = %base64(self.auction_id), bundle.bid = %bundle.bid(), "new highest bid")
}
}

}
};
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-auctioneer/src/block/block_commitment_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use futures::{
};

use super::Commitment;
use crate::sequencer_grpc_client::SequencerGrpcClient;
use crate::sequencer_grpc_client::OptimisticBlockClient;

/// A stream for receiving committed blocks from the sequencer.
pub(crate) struct BlockCommitmentStream {
client: Pin<Box<tonic::Streaming<GetBlockCommitmentStreamResponse>>>,
}

impl BlockCommitmentStream {
pub(crate) async fn connect(mut sequencer_client: SequencerGrpcClient) -> eyre::Result<Self> {
pub(crate) async fn connect(mut sequencer_client: OptimisticBlockClient) -> eyre::Result<Self> {
let committed_stream_client = sequencer_client
.get_block_commitment_stream()
.await
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-auctioneer/src/block/optimistic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{
};

use super::Optimistic;
use crate::sequencer_grpc_client::SequencerGrpcClient;
use crate::sequencer_grpc_client::OptimisticBlockClient;

/// A stream for receiving optimistic blocks from the sequencer.
pub(crate) struct OptimisticBlockStream {
Expand All @@ -25,7 +25,7 @@ pub(crate) struct OptimisticBlockStream {
impl OptimisticBlockStream {
pub(crate) async fn connect(
rollup_id: RollupId,
mut sequencer_client: SequencerGrpcClient,
mut sequencer_client: OptimisticBlockClient,
) -> eyre::Result<OptimisticBlockStream> {
let optimistic_stream_client = sequencer_client
.get_optimistic_block_stream(rollup_id)
Expand Down
26 changes: 26 additions & 0 deletions crates/astria-auctioneer/src/bundle/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ use astria_core::generated::bundle::v1alpha1::{
};
use astria_eyre::eyre::{
self,
OptionExt,
WrapErr as _,
};
use axum::http::Uri;
use futures::{
Stream,
StreamExt,
};
use tonic::transport::Endpoint;
use tracing::{
warn,
Expand All @@ -21,6 +26,8 @@ use tracing::{
};
use tryhard::backoff_strategies::ExponentialBackoff;

use super::Bundle;

pub(crate) struct BundleClient {
inner: BundleServiceClient<tonic::transport::Channel>,
uri: Uri,
Expand Down Expand Up @@ -106,3 +113,22 @@ impl BundleStream {
})
}
}

impl Stream for BundleStream {
type Item = eyre::Result<Bundle>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let raw = futures::ready!(self.client.poll_next_unpin(cx))
.ok_or_eyre("stream has been closed")?
.wrap_err("received gRPC error")?
.bundle
.ok_or_eyre("bundle stream response did not contain bundle")?;

let bundle = Bundle::try_from_raw(raw).wrap_err("failed to parse raw Bundle")?;

std::task::Poll::Ready(Some(Ok(bundle)))
}
}
1 change: 1 addition & 0 deletions crates/astria-auctioneer/src/bundle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use astria_eyre::eyre::{
WrapErr as _,
};
use bytes::Bytes;
pub(crate) use client::BundleStream;
use prost::Message as _;

mod client;
Expand Down
42 changes: 28 additions & 14 deletions crates/astria-auctioneer/src/optimistic_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ use crate::{
executed_stream::ExecutedBlockStream,
optimistic_stream::OptimisticBlockStream,
},
sequencer_grpc_client::SequencerGrpcClient,
bundle::{
Bundle,
BundleStream,
},
sequencer_grpc_client::OptimisticBlockClient,
sequencer_key::SequencerKey,
};

Expand Down Expand Up @@ -63,7 +67,7 @@ impl Startup {
sequencer_chain_id,
} = self;

let sequencer_client = SequencerGrpcClient::new(&sequencer_grpc_endpoint)
let sequencer_client = OptimisticBlockClient::new(&sequencer_grpc_endpoint)
.wrap_err("failed to initialize sequencer grpc client")?;
// TODO: have a connect streams helper?
let mut optimistic_blocks =
Expand All @@ -76,10 +80,13 @@ impl Startup {
.wrap_err("failed to initialize block commitment stream")?;

let (blocks_to_execute_handle, executed_blocks) =
ExecutedBlockStream::connect(rollup_id, rollup_grpc_endpoint)
ExecutedBlockStream::connect(rollup_id, rollup_grpc_endpoint.clone())
.await
.wrap_err("failed to initialize executed block stream")?;

let bundle_stream = BundleStream::connect(rollup_grpc_endpoint)
.await
.wrap_err("failed to initialize bundle stream")?;
// let bundle_stream = BundleServiceClient::new(bundle_service_grpc_url)
// .wrap_err("failed to initialize bundle service grpc client")?;

Expand Down Expand Up @@ -112,6 +119,7 @@ impl Startup {
block_commitments,
executed_blocks,
blocks_to_execute_handle,
bundle_stream,
auctions,
current_block,
})
Expand All @@ -125,6 +133,7 @@ pub(crate) struct Running {
block_commitments: BlockCommitmentStream,
executed_blocks: ExecutedBlockStream,
blocks_to_execute_handle: block::executed_stream::Handle,
bundle_stream: BundleStream,
auctions: auction::Manager,
current_block: block::Current,
}
Expand Down Expand Up @@ -163,17 +172,11 @@ impl Running {
self.executed_block_handler(executed_block).wrap_err("failed to handle executed block")?;
}

// 2. forward bundles from bundle stream into the correct auction fut
// - bundles will build up in the channel into the auction until the executed signal is
// sent to the auction fut. so if backpressure builds up here, i.e. bids arrive way
// before execution, we can decide how to react here.
// for example, we can drop all the bundles that are stuck in the channel and log a warning,
// or we can kill the auction for that given block
// Some(res) = bundle_stream.next() => {
// filter by auction id/block hash
// push into correct mpsc channel
// TODO: document how backpressure is handled here
// }
Some(res) = self.bundle_stream.next() => {
let bundle = res.wrap_err("failed to get bundle")?;

self.bundle_handler(bundle).wrap_err("failed to handle bundle")?;
}
}
}
};
Expand Down Expand Up @@ -254,6 +257,17 @@ impl Running {
Ok(())
}

#[instrument(skip(self), fields(auction.id = %base64(self.current_block.sequencer_block_hash())))]
fn bundle_handler(&mut self, bundle: Bundle) -> eyre::Result<()> {
let auction_id =
auction::Id::from_sequencer_block_hash(self.current_block.sequencer_block_hash());
self.auctions
.try_send_bundle(auction_id, bundle)
.wrap_err("failed to submit bundle to auction")?;

Ok(())
}

async fn shutdown(self) {
self.shutdown_token.cancel();
}
Expand Down
Loading

0 comments on commit 45810bf

Please sign in to comment.