Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: error handling for route_order and some order duplication #13

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/collectors/uniswapx_route_collector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloy_primitives::Uint;
use anyhow::{Context, Result};
use anyhow::{anyhow, Result};
use reqwest::header::ORIGIN;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -10,7 +10,7 @@ use artemis_core::types::{Collector, CollectorStream};
use async_trait::async_trait;
use futures::lock::Mutex;
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use reqwest::{Client, StatusCode};

const ROUTING_API: &str = "https://api.uniswap.org/v1/quote";
const SLIPPAGE_TOLERANCE: &str = "0.5";
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Collector<RoutedOrder> for UniswapXRouteCollector {
let stream = async_stream::stream! {
let mut receiver = self.route_request_receiver.lock().await;
while let Some(route_requests) = receiver.recv().await {
let tasks: FuturesUnordered<_> = route_requests.iter()
let tasks: FuturesUnordered<_> = route_requests.into_iter()
.map(|batch| {
let OrderBatchData { orders, token_in, token_out, amount_in, .. } = batch.clone();
info!(
Expand Down Expand Up @@ -232,16 +232,30 @@ pub async fn route_order(params: RouteOrderParams) -> Result<OrderRoute> {

let client = reqwest::Client::new();

Ok(client
let response = client
.get(format!("{}?{}", ROUTING_API, query_string))
.header(ORIGIN, "https://app.uniswap.org")
.header("x-request-source", "uniswap-web")
.send()
.await
.context("Quote request failed with {}")?
.json::<OrderRoute>()
.await
.context("Failed to parse response: {}")?)
.map_err(|e| anyhow!("Quote request failed with error: {}", e))?;

match response.status() {
StatusCode::OK => Ok(response
.json::<OrderRoute>()
.await
.map_err(|e| anyhow!("Failed to parse response: {}", e))?),
StatusCode::BAD_REQUEST => Err(anyhow!("Bad request: {}", response.status())),
StatusCode::NOT_FOUND => Err(anyhow!("Not quote found: {}", response.status())),
StatusCode::TOO_MANY_REQUESTS => Err(anyhow!("Too many requests: {}", response.status())),
StatusCode::INTERNAL_SERVER_ERROR => {
Err(anyhow!("Internal server error: {}", response.status()))
}
_ => Err(anyhow!(
"Unexpected error with status code: {}",
response.status()
)),
}
}

// The Uniswap routing API requires that "ETH" be used instead of the zero address
Expand Down
18 changes: 12 additions & 6 deletions src/executors/protect_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::{
};
use tracing::info;

use anyhow::{Context, Result};
use anyhow::Result;
use artemis_core::executors::mempool_executor::SubmitTxToMempool;
use artemis_core::types::Executor;
use async_trait::async_trait;
use ethers::providers::Middleware;
use ethers::{providers::Middleware, types::U256};

/// An executor that sends transactions to the mempool.
pub struct ProtectExecutor<M, N> {
Expand Down Expand Up @@ -39,9 +39,12 @@ where
.client
.estimate_gas(&action.tx, None)
.await
.context("Error estimating gas usage: {}");
.unwrap_or_else(|err| {
info!("Error estimating gas: {}", err);
U256::from(1_000_000)
});
info!("Gas Usage {:?}", gas_usage_result);
let gas_usage = gas_usage_result?;
let gas_usage = gas_usage_result;

let bid_gas_price;
if let Some(gas_bid_info) = action.gas_bid_info {
Expand All @@ -56,12 +59,15 @@ where
.client
.get_gas_price()
.await
.context("Error getting gas price: {}")?;
.map_err(|err| anyhow::anyhow!("Error getting gas price: {}", err))?;
}
action.tx.set_gas_price(bid_gas_price);

info!("Executing tx {:?}", action.tx);
self.sender_client.send_transaction(action.tx, None).await?;
self.sender_client
.send_transaction(action.tx, None)
.await
.map_err(|err| anyhow::anyhow!("Error sending transaction: {}", err))?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion src/executors/public_1559_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ where
info!("Executing tx {:?}", action.execution.tx);
self.sender_client
.send_transaction(action.execution.tx, None)
.await?;
.await
.map_err(|err| anyhow::anyhow!("Error sending transaction: {}", err))?;
Ok(())
}
}
20 changes: 16 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use strategies::{
uniswapx_strategy::UniswapXUniswapFill,
};
use tokio::sync::mpsc::channel;
use tracing::{info, Level};
use tracing::{error, info, Level};
use tracing_subscriber::{filter, prelude::*};

pub mod collectors;
Expand Down Expand Up @@ -175,9 +175,21 @@ async fn main() -> Result<()> {
engine.add_executor(Box::new(protect_executor));
engine.add_executor(Box::new(public_tx_executor));
// Start engine.
if let Ok(mut set) = engine.run().await {
while let Some(res) = set.join_next().await {
info!("res: {:?}", res);
match engine.run().await {
Ok(mut set) => {
while let Some(res) = set.join_next().await {
match res {
Ok(res) => {
info!("res: {:?}", res);
}
Err(e) => {
info!("error: {:?}", e);
}
}
}
}
Err(e) => {
error!("Engine run error: {:?}", e);
}
}
Ok(())
Expand Down
Loading
Loading