Skip to content

Commit

Permalink
feat: cost tracking (#146)
Browse files Browse the repository at this point in the history
Co-authored-by: Ahmed Sagdati <[email protected]>
  • Loading branch information
MujkicA and segfault-magnet authored Nov 27, 2024
1 parent 949ae96 commit 4040689
Show file tree
Hide file tree
Showing 27 changed files with 1,021 additions and 80 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 34 additions & 4 deletions committer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,40 @@ use actix_web::{
error::InternalError, get, http::StatusCode, web, App, HttpResponse, HttpServer, Responder,
};
use ports::storage::Storage;
use services::{HealthReporter, StatusReporter};
use serde::Deserialize;
use services::{CostReporter, HealthReporter, StatusReporter};

use crate::{
config::Config,
config::{Config, Internal},
errors::{Error, Result},
Database,
};

pub async fn launch_api_server(
config: &Config,
internal_config: &Internal,
metrics_registry: Registry,
storage: impl Storage + 'static,
storage: impl Storage + 'static + Clone,
fuel_health_check: HealthChecker,
eth_health_check: HealthChecker,
) -> Result<()> {
let metrics_registry = Arc::new(metrics_registry);
let status_reporter = Arc::new(StatusReporter::new(storage));
let status_reporter = Arc::new(StatusReporter::new(storage.clone()));
let health_reporter = Arc::new(HealthReporter::new(fuel_health_check, eth_health_check));
let cost_reporter = Arc::new(CostReporter::new(
storage,
internal_config.cost_request_limit,
));
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(Arc::clone(&metrics_registry)))
.app_data(web::Data::new(Arc::clone(&status_reporter)))
.app_data(web::Data::new(Arc::clone(&health_reporter)))
.app_data(web::Data::new(Arc::clone(&cost_reporter)))
.service(status)
.service(metrics)
.service(health)
.service(costs)
})
.bind((config.app.host, config.app.port))
.map_err(|e| Error::Other(e.to_string()))?
Expand Down Expand Up @@ -80,6 +88,28 @@ async fn metrics(registry: web::Data<Arc<Registry>>) -> impl Responder {
std::result::Result::<_, InternalError<_>>::Ok(text)
}

#[derive(Deserialize)]
struct CostQueryParams {
from_height: u32,
limit: Option<usize>,
}

#[get("/v1/costs")]
async fn costs(
data: web::Data<Arc<CostReporter<Database>>>,
query: web::Query<CostQueryParams>,
) -> impl Responder {
let limit = query.limit.unwrap_or(100);

match data.get_costs(query.from_height, limit).await {
Ok(bundle_costs) => HttpResponse::Ok().json(bundle_costs),
Err(services::Error::Other(e)) => {
HttpResponse::from_error(InternalError::new(e, StatusCode::BAD_REQUEST))
}
Err(e) => HttpResponse::from_error(map_to_internal_err(e)),
}
}

fn map_to_internal_err(error: impl std::error::Error) -> InternalError<String> {
InternalError::new(error.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
}
2 changes: 2 additions & 0 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub struct Internal {
pub fuel_errors_before_unhealthy: usize,
pub eth_errors_before_unhealthy: usize,
pub balance_update_interval: Duration,
pub cost_request_limit: usize,
}

impl Default for Internal {
Expand All @@ -204,6 +205,7 @@ impl Default for Internal {
fuel_errors_before_unhealthy: 3,
eth_errors_before_unhealthy: 3,
balance_update_interval: Duration::from_secs(10),
cost_request_limit: 1000,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions committer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async fn main() -> Result<()> {

launch_api_server(
&config,
&internal_config,
metrics_registry,
storage.clone(),
fuel_health_check,
Expand Down
19 changes: 18 additions & 1 deletion e2e/src/committer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{path::Path, time::Duration};

use anyhow::Context;
use ports::types::Address;
use ports::types::{Address, BundleCost};
use url::Url;

#[derive(Default)]
Expand Down Expand Up @@ -266,4 +266,21 @@ impl CommitterProcess {
.expect("metric format to be in the format 'NAME VAL'")
.parse()?)
}

pub async fn fetch_costs(
&self,
from_height: u32,
limit: usize,
) -> anyhow::Result<Vec<BundleCost>> {
let response = reqwest::get(format!(
"http://localhost:{}/v1/costs?from_height={}&limit={}",
self.port, from_height, limit
))
.await?
.error_for_status()?
.json()
.await?;

Ok(response)
}
}
3 changes: 3 additions & 0 deletions e2e/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}

let bundle_cost = stack.committer.fetch_costs(0, 10).await?.pop().unwrap();
assert!(bundle_cost.cost > 0);

Ok(())
}

Expand Down
14 changes: 14 additions & 0 deletions packages/eth/src/websocket/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,16 @@ impl WsConnection {

let block_number = Self::extract_block_number_from_receipt(&tx_receipt)?;

let fee = tx_receipt
.gas_used
.saturating_mul(tx_receipt.effective_gas_price);
let blob_fee = Self::extract_blob_fee_from_receipt(&tx_receipt);

Ok(Some(TransactionResponse::new(
block_number,
tx_receipt.status(),
fee,
blob_fee,
)))
}

Expand All @@ -449,6 +456,13 @@ impl WsConnection {
Error::Other("transaction receipt does not contain block number".to_string())
})
}

fn extract_blob_fee_from_receipt(receipt: &TransactionReceipt) -> u128 {
match (receipt.blob_gas_used, receipt.blob_gas_price) {
(Some(gas_used), Some(gas_price)) => gas_used.saturating_mul(gas_price),
_ => 0,
}
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions packages/ports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ itertools = { workspace = true, features = ["use_std"], optional = true }
mockall = { workspace = true, optional = true }
nonempty = { workspace = true }
rand = { workspace = true, optional = true }
serde = { workspace = true }
sqlx = { workspace = true, features = ["chrono"] }
thiserror = { workspace = true, optional = true }
trait-variant = { workspace = true, optional = true }
Expand Down
Loading

0 comments on commit 4040689

Please sign in to comment.