From 40406892fb4c18139e7f6c4757c375705bfc5324 Mon Sep 17 00:00:00 2001 From: MujkicA <32431923+MujkicA@users.noreply.github.com> Date: Wed, 27 Nov 2024 12:20:27 +0100 Subject: [PATCH] feat: cost tracking (#146) Co-authored-by: Ahmed Sagdati <37515857+segfault-magnet@users.noreply.github.com> --- ...ad735e226634f70e12df55a43df5c10b1b4da.json | 34 ++ ...af7c57e2ed0b3a2f0773358b1dfef7f7deb4b.json | 23 ++ ...dd4b6322c18b97011bffcbe0cc91e46b1d345.json | 18 + ...f4524a3889215e5056401b4bcd5fe4b027b58.json | 23 ++ ...8bf096f832cde79333f23a28dbef155c6526e.json | 18 + ...93f3050820931aeea93d35239f980a22ced1f.json | 59 +++ Cargo.lock | 1 + committer/src/api.rs | 38 +- committer/src/config.rs | 2 + committer/src/main.rs | 1 + e2e/src/committer.rs | 19 +- e2e/src/lib.rs | 3 + packages/eth/src/websocket/connection.rs | 14 + packages/ports/Cargo.toml | 1 + packages/ports/src/ports/storage.rs | 20 +- packages/ports/src/types.rs | 2 + packages/ports/src/types/bundle_cost.rs | 22 + packages/ports/src/types/transactions.rs | 11 +- packages/services/src/block_committer.rs | 2 +- packages/services/src/cost_reporter.rs | 36 ++ packages/services/src/lib.rs | 8 + packages/services/src/state_listener.rs | 26 +- .../storage/migrations/0007_cost_tracking.sql | 41 ++ packages/storage/src/lib.rs | 223 +++++++++- packages/storage/src/mappings/tables.rs | 64 ++- packages/storage/src/postgres.rs | 384 ++++++++++++++++-- packages/storage/src/test_instance.rs | 8 +- 27 files changed, 1021 insertions(+), 80 deletions(-) create mode 100644 .sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json create mode 100644 .sqlx/query-5cc9cfd8d498774fa4b70dad22caf7c57e2ed0b3a2f0773358b1dfef7f7deb4b.json create mode 100644 .sqlx/query-b6989cabec71adc8953079f03e4dd4b6322c18b97011bffcbe0cc91e46b1d345.json create mode 100644 .sqlx/query-c820bfe642b85081bd8a594d94ff4524a3889215e5056401b4bcd5fe4b027b58.json create mode 100644 .sqlx/query-cc10c6369b02a35fee75d611dbd8bf096f832cde79333f23a28dbef155c6526e.json create mode 100644 .sqlx/query-f1ac90602eca2f5966383dc436593f3050820931aeea93d35239f980a22ced1f.json create mode 100644 packages/ports/src/types/bundle_cost.rs create mode 100644 packages/services/src/cost_reporter.rs create mode 100644 packages/storage/migrations/0007_cost_tracking.sql diff --git a/.sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json b/.sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json new file mode 100644 index 00000000..20e6491f --- /dev/null +++ b/.sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n f.bundle_id,\n SUM(f.total_bytes)::BIGINT AS total_bytes,\n SUM(f.unused_bytes)::BIGINT AS unused_bytes\n FROM\n l1_blob_transaction t\n JOIN l1_transaction_fragments tf ON t.id = tf.transaction_id\n JOIN l1_fragments f ON tf.fragment_id = f.id\n WHERE\n t.hash = $1\n GROUP BY\n f.bundle_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bundle_id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "total_bytes", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "unused_bytes", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false, + null, + null + ] + }, + "hash": "417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da" +} diff --git a/.sqlx/query-5cc9cfd8d498774fa4b70dad22caf7c57e2ed0b3a2f0773358b1dfef7f7deb4b.json b/.sqlx/query-5cc9cfd8d498774fa4b70dad22caf7c57e2ed0b3a2f0773358b1dfef7f7deb4b.json new file mode 100644 index 00000000..497f77e5 --- /dev/null +++ b/.sqlx/query-5cc9cfd8d498774fa4b70dad22caf7c57e2ed0b3a2f0773358b1dfef7f7deb4b.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT COUNT(*) = 0 AS \"is_finalized!\"\n FROM l1_fragments f\n WHERE f.bundle_id = $1 AND NOT EXISTS (\n SELECT 1\n FROM l1_transaction_fragments tf\n JOIN l1_blob_transaction t ON tf.transaction_id = t.id\n WHERE tf.fragment_id = f.id AND t.state = $2\n )\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "is_finalized!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int4", + "Int2" + ] + }, + "nullable": [ + null + ] + }, + "hash": "5cc9cfd8d498774fa4b70dad22caf7c57e2ed0b3a2f0773358b1dfef7f7deb4b" +} diff --git a/.sqlx/query-b6989cabec71adc8953079f03e4dd4b6322c18b97011bffcbe0cc91e46b1d345.json b/.sqlx/query-b6989cabec71adc8953079f03e4dd4b6322c18b97011bffcbe0cc91e46b1d345.json new file mode 100644 index 00000000..d7efbcd8 --- /dev/null +++ b/.sqlx/query-b6989cabec71adc8953079f03e4dd4b6322c18b97011bffcbe0cc91e46b1d345.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO bundle_cost (\n bundle_id, cost, size, da_block_height, is_finalized\n ) VALUES (\n $1, $2, $3, $4, $5\n )\n ON CONFLICT (bundle_id) DO UPDATE SET\n cost = bundle_cost.cost + EXCLUDED.cost,\n size = bundle_cost.size + EXCLUDED.size,\n da_block_height = EXCLUDED.da_block_height,\n is_finalized = EXCLUDED.is_finalized\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Numeric", + "Int8", + "Int8", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "b6989cabec71adc8953079f03e4dd4b6322c18b97011bffcbe0cc91e46b1d345" +} diff --git a/.sqlx/query-c820bfe642b85081bd8a594d94ff4524a3889215e5056401b4bcd5fe4b027b58.json b/.sqlx/query-c820bfe642b85081bd8a594d94ff4524a3889215e5056401b4bcd5fe4b027b58.json new file mode 100644 index 00000000..c014fb1a --- /dev/null +++ b/.sqlx/query-c820bfe642b85081bd8a594d94ff4524a3889215e5056401b4bcd5fe4b027b58.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT COUNT(*) = 0 AS \"is_finalized!\"\n FROM l1_fragments f\n WHERE f.bundle_id = $1 AND NOT EXISTS (\n SELECT 1\n FROM l1_transaction_fragments tf\n JOIN l1_blob_transaction t ON tf.transaction_id = t.id\n WHERE tf.fragment_id = f.id AND t.state = $2\n )\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "is_finalized!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int4", + "Int2" + ] + }, + "nullable": [ + null + ] + }, + "hash": "c820bfe642b85081bd8a594d94ff4524a3889215e5056401b4bcd5fe4b027b58" +} diff --git a/.sqlx/query-cc10c6369b02a35fee75d611dbd8bf096f832cde79333f23a28dbef155c6526e.json b/.sqlx/query-cc10c6369b02a35fee75d611dbd8bf096f832cde79333f23a28dbef155c6526e.json new file mode 100644 index 00000000..bdc72651 --- /dev/null +++ b/.sqlx/query-cc10c6369b02a35fee75d611dbd8bf096f832cde79333f23a28dbef155c6526e.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO bundle_cost (\n bundle_id, cost, size, da_block_height, is_finalized\n ) VALUES (\n $1, $2, $3, $4, $5\n )\n ON CONFLICT (bundle_id) DO UPDATE SET\n cost = bundle_cost.cost + EXCLUDED.cost,\n size = bundle_cost.size + EXCLUDED.size,\n da_block_height = EXCLUDED.da_block_height,\n is_finalized = EXCLUDED.is_finalized\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Numeric", + "Int8", + "Int8", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "cc10c6369b02a35fee75d611dbd8bf096f832cde79333f23a28dbef155c6526e" +} diff --git a/.sqlx/query-f1ac90602eca2f5966383dc436593f3050820931aeea93d35239f980a22ced1f.json b/.sqlx/query-f1ac90602eca2f5966383dc436593f3050820931aeea93d35239f980a22ced1f.json new file mode 100644 index 00000000..f7ab6859 --- /dev/null +++ b/.sqlx/query-f1ac90602eca2f5966383dc436593f3050820931aeea93d35239f980a22ced1f.json @@ -0,0 +1,59 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bc.bundle_id,\n bc.cost,\n bc.size,\n bc.da_block_height,\n bc.is_finalized,\n b.start_height,\n b.end_height\n FROM\n bundle_cost bc\n JOIN bundles b ON bc.bundle_id = b.id\n WHERE\n b.start_height >= $1 AND bc.is_finalized = TRUE\n ORDER BY\n b.start_height ASC\n LIMIT $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bundle_id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "cost", + "type_info": "Numeric" + }, + { + "ordinal": 2, + "name": "size", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "da_block_height", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "is_finalized", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "start_height", + "type_info": "Int8" + }, + { + "ordinal": 6, + "name": "end_height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "f1ac90602eca2f5966383dc436593f3050820931aeea93d35239f980a22ced1f" +} diff --git a/Cargo.lock b/Cargo.lock index 1e1d9790..393b674e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4605,6 +4605,7 @@ dependencies = [ "mockall", "nonempty", "rand", + "serde", "sqlx", "thiserror", "trait-variant", diff --git a/committer/src/api.rs b/committer/src/api.rs index 9deb3c47..60d076b0 100644 --- a/committer/src/api.rs +++ b/committer/src/api.rs @@ -8,32 +8,40 @@ use actix_web::{ error::InternalError, get, http::StatusCode, web, App, HttpResponse, HttpServer, Responder, }; use ports::storage::Storage; -use services::{HealthReporter, StatusReporter}; +use serde::Deserialize; +use services::{CostReporter, HealthReporter, StatusReporter}; use crate::{ - config::Config, + config::{Config, Internal}, errors::{Error, Result}, Database, }; pub async fn launch_api_server( config: &Config, + internal_config: &Internal, metrics_registry: Registry, - storage: impl Storage + 'static, + storage: impl Storage + 'static + Clone, fuel_health_check: HealthChecker, eth_health_check: HealthChecker, ) -> Result<()> { let metrics_registry = Arc::new(metrics_registry); - let status_reporter = Arc::new(StatusReporter::new(storage)); + let status_reporter = Arc::new(StatusReporter::new(storage.clone())); let health_reporter = Arc::new(HealthReporter::new(fuel_health_check, eth_health_check)); + let cost_reporter = Arc::new(CostReporter::new( + storage, + internal_config.cost_request_limit, + )); HttpServer::new(move || { App::new() .app_data(web::Data::new(Arc::clone(&metrics_registry))) .app_data(web::Data::new(Arc::clone(&status_reporter))) .app_data(web::Data::new(Arc::clone(&health_reporter))) + .app_data(web::Data::new(Arc::clone(&cost_reporter))) .service(status) .service(metrics) .service(health) + .service(costs) }) .bind((config.app.host, config.app.port)) .map_err(|e| Error::Other(e.to_string()))? @@ -80,6 +88,28 @@ async fn metrics(registry: web::Data>) -> impl Responder { std::result::Result::<_, InternalError<_>>::Ok(text) } +#[derive(Deserialize)] +struct CostQueryParams { + from_height: u32, + limit: Option, +} + +#[get("/v1/costs")] +async fn costs( + data: web::Data>>, + query: web::Query, +) -> impl Responder { + let limit = query.limit.unwrap_or(100); + + match data.get_costs(query.from_height, limit).await { + Ok(bundle_costs) => HttpResponse::Ok().json(bundle_costs), + Err(services::Error::Other(e)) => { + HttpResponse::from_error(InternalError::new(e, StatusCode::BAD_REQUEST)) + } + Err(e) => HttpResponse::from_error(map_to_internal_err(e)), + } +} + fn map_to_internal_err(error: impl std::error::Error) -> InternalError { InternalError::new(error.to_string(), StatusCode::INTERNAL_SERVER_ERROR) } diff --git a/committer/src/config.rs b/committer/src/config.rs index bdeff4e9..e4fa95b7 100644 --- a/committer/src/config.rs +++ b/committer/src/config.rs @@ -196,6 +196,7 @@ pub struct Internal { pub fuel_errors_before_unhealthy: usize, pub eth_errors_before_unhealthy: usize, pub balance_update_interval: Duration, + pub cost_request_limit: usize, } impl Default for Internal { @@ -204,6 +205,7 @@ impl Default for Internal { fuel_errors_before_unhealthy: 3, eth_errors_before_unhealthy: 3, balance_update_interval: Duration::from_secs(10), + cost_request_limit: 1000, } } } diff --git a/committer/src/main.rs b/committer/src/main.rs index d6e6222c..54b51182 100644 --- a/committer/src/main.rs +++ b/committer/src/main.rs @@ -104,6 +104,7 @@ async fn main() -> Result<()> { launch_api_server( &config, + &internal_config, metrics_registry, storage.clone(), fuel_health_check, diff --git a/e2e/src/committer.rs b/e2e/src/committer.rs index 2aa64fa4..fc654fc0 100644 --- a/e2e/src/committer.rs +++ b/e2e/src/committer.rs @@ -1,7 +1,7 @@ use std::{path::Path, time::Duration}; use anyhow::Context; -use ports::types::Address; +use ports::types::{Address, BundleCost}; use url::Url; #[derive(Default)] @@ -266,4 +266,21 @@ impl CommitterProcess { .expect("metric format to be in the format 'NAME VAL'") .parse()?) } + + pub async fn fetch_costs( + &self, + from_height: u32, + limit: usize, + ) -> anyhow::Result> { + let response = reqwest::get(format!( + "http://localhost:{}/v1/costs?from_height={}&limit={}", + self.port, from_height, limit + )) + .await? + .error_for_status()? + .json() + .await?; + + Ok(response) + } } diff --git a/e2e/src/lib.rs b/e2e/src/lib.rs index 9e2f253a..06ca2c4d 100644 --- a/e2e/src/lib.rs +++ b/e2e/src/lib.rs @@ -100,6 +100,9 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } + let bundle_cost = stack.committer.fetch_costs(0, 10).await?.pop().unwrap(); + assert!(bundle_cost.cost > 0); + Ok(()) } diff --git a/packages/eth/src/websocket/connection.rs b/packages/eth/src/websocket/connection.rs index afb3852e..66a4504f 100644 --- a/packages/eth/src/websocket/connection.rs +++ b/packages/eth/src/websocket/connection.rs @@ -438,9 +438,16 @@ impl WsConnection { let block_number = Self::extract_block_number_from_receipt(&tx_receipt)?; + let fee = tx_receipt + .gas_used + .saturating_mul(tx_receipt.effective_gas_price); + let blob_fee = Self::extract_blob_fee_from_receipt(&tx_receipt); + Ok(Some(TransactionResponse::new( block_number, tx_receipt.status(), + fee, + blob_fee, ))) } @@ -449,6 +456,13 @@ impl WsConnection { Error::Other("transaction receipt does not contain block number".to_string()) }) } + + fn extract_blob_fee_from_receipt(receipt: &TransactionReceipt) -> u128 { + match (receipt.blob_gas_used, receipt.blob_gas_price) { + (Some(gas_used), Some(gas_price)) => gas_used.saturating_mul(gas_price), + _ => 0, + } + } } #[cfg(test)] diff --git a/packages/ports/Cargo.toml b/packages/ports/Cargo.toml index 4db99ed0..5c9f844d 100644 --- a/packages/ports/Cargo.toml +++ b/packages/ports/Cargo.toml @@ -19,6 +19,7 @@ itertools = { workspace = true, features = ["use_std"], optional = true } mockall = { workspace = true, optional = true } nonempty = { workspace = true } rand = { workspace = true, optional = true } +serde = { workspace = true } sqlx = { workspace = true, features = ["chrono"] } thiserror = { workspace = true, optional = true } trait-variant = { workspace = true, optional = true } diff --git a/packages/ports/src/ports/storage.rs b/packages/ports/src/ports/storage.rs index 67a3804b..8eb8079a 100644 --- a/packages/ports/src/ports/storage.rs +++ b/packages/ports/src/ports/storage.rs @@ -12,8 +12,8 @@ use itertools::Itertools; pub use sqlx::types::chrono::{DateTime, Utc}; use crate::types::{ - BlockSubmission, BlockSubmissionTx, CollectNonEmpty, CompressedFuelBlock, Fragment, L1Tx, - NonEmpty, NonNegative, TransactionState, + BlockSubmission, BlockSubmissionTx, BundleCost, CollectNonEmpty, CompressedFuelBlock, Fragment, + L1Tx, NonEmpty, NonNegative, TransactionCostUpdate, TransactionState, }; #[derive(Debug, thiserror::Error)] @@ -192,11 +192,17 @@ pub trait Storage: Send + Sync { ) -> Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> Result>; async fn last_time_a_fragment_was_finalized(&self) -> Result>>; - async fn batch_update_tx_states( + async fn update_tx_states_and_costs( &self, selective_changes: Vec<([u8; 32], TransactionState)>, noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + cost_per_tx: Vec, ) -> Result<()>; + async fn get_finalized_costs( + &self, + from_block_height: u32, + limit: usize, + ) -> Result>; } impl Storage for Arc { @@ -241,11 +247,13 @@ impl Storage for Arc { ) -> Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> Result>; async fn last_time_a_fragment_was_finalized(&self) -> Result>>; - async fn batch_update_tx_states( + async fn update_tx_states_and_costs( &self, selective_changes: Vec<([u8; 32], TransactionState)>, noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + cost_per_tx: Vec, ) -> Result<()>; + async fn get_finalized_costs(&self, from_block_height: u32, limit: usize) -> Result>; } } } @@ -292,11 +300,13 @@ impl Storage for &T { ) -> Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> Result>; async fn last_time_a_fragment_was_finalized(&self) -> Result>>; - async fn batch_update_tx_states( + async fn update_tx_states_and_costs( &self, selective_changes: Vec<([u8; 32], TransactionState)>, noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + cost_per_tx: Vec, ) -> Result<()>; + async fn get_finalized_costs(&self, from_block_height: u32, limit: usize) -> Result>; } } } diff --git a/packages/ports/src/types.rs b/packages/ports/src/types.rs index 83da0d4b..27713fa5 100644 --- a/packages/ports/src/types.rs +++ b/packages/ports/src/types.rs @@ -7,6 +7,7 @@ mod non_empty; pub use non_empty::*; mod block_submission; +mod bundle_cost; mod fragment; #[cfg(feature = "l1")] mod fuel_block_committed_on_l1; @@ -16,6 +17,7 @@ mod state_submission; mod transactions; pub use block_submission::*; +pub use bundle_cost::*; pub use fragment::*; #[cfg(feature = "l1")] pub use fuel_block_committed_on_l1::*; diff --git a/packages/ports/src/types/bundle_cost.rs b/packages/ports/src/types/bundle_cost.rs new file mode 100644 index 00000000..6f45ae0b --- /dev/null +++ b/packages/ports/src/types/bundle_cost.rs @@ -0,0 +1,22 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone)] +pub struct TransactionCostUpdate { + pub tx_hash: [u8; 32], + pub total_fee: u128, + pub da_block_height: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BundleCost { + // total cost of the bundle + pub cost: u128, + // total size of the data contained in the bundle + pub size: u64, + // da height of the final transaction carrying the bundle + pub da_block_height: u64, + // starting height of the block contained block range + pub start_height: u64, + // ending height of the block contained block range (inclusive) + pub end_height: u64, +} diff --git a/packages/ports/src/types/transactions.rs b/packages/ports/src/types/transactions.rs index 91cc4b8c..b877a44f 100644 --- a/packages/ports/src/types/transactions.rs +++ b/packages/ports/src/types/transactions.rs @@ -5,6 +5,7 @@ pub enum TransactionState { Pending, IncludedInBlock, Finalized(DateTime), + SqueezedOut, Failed, } @@ -12,13 +13,17 @@ pub enum TransactionState { pub struct TransactionResponse { block_number: u64, succeeded: bool, + fee: u128, + blob_fee: u128, } impl TransactionResponse { - pub fn new(block_number: u64, succeeded: bool) -> Self { + pub fn new(block_number: u64, succeeded: bool, fee: u128, blob_fee: u128) -> Self { Self { block_number, succeeded, + fee, + blob_fee, } } @@ -30,6 +35,10 @@ impl TransactionResponse { self.succeeded } + pub fn total_fee(&self) -> u128 { + self.fee.saturating_add(self.blob_fee) + } + pub fn confirmations(&self, current_block_number: u64) -> u64 { if !self.succeeded() { return 0; diff --git a/packages/services/src/block_committer.rs b/packages/services/src/block_committer.rs index 02fd022c..0f59ad42 100644 --- a/packages/services/src/block_committer.rs +++ b/packages/services/src/block_committer.rs @@ -451,7 +451,7 @@ mod tests { let fuel_adapter = given_fetcher(vec![latest_block]); let db = db_with_submissions(vec![0, 2, 4]).await; - let tx_response = TransactionResponse::new(latest_height as u64, true); + let tx_response = TransactionResponse::new(latest_height as u64, true, 100, 0); let l1 = given_l1_that_expects_transaction_response(latest_height, [4; 32], Some(tx_response)); diff --git a/packages/services/src/cost_reporter.rs b/packages/services/src/cost_reporter.rs new file mode 100644 index 00000000..e659f4bd --- /dev/null +++ b/packages/services/src/cost_reporter.rs @@ -0,0 +1,36 @@ +use ports::{storage::Storage, types::BundleCost}; + +use crate::{Error, Result}; + +pub struct CostReporter { + storage: Db, + request_limit: usize, +} + +impl CostReporter { + pub fn new(storage: Db, request_limit: usize) -> Self { + Self { + storage, + request_limit, + } + } +} + +impl CostReporter +where + Db: Storage, +{ + pub async fn get_costs(&self, from_block_height: u32, limit: usize) -> Result> { + if limit > self.request_limit { + return Err(Error::Other(format!( + "requested: {} items, but limit is: {}", + limit, self.request_limit + ))); + } + + Ok(self + .storage + .get_finalized_costs(from_block_height, limit) + .await?) + } +} diff --git a/packages/services/src/lib.rs b/packages/services/src/lib.rs index 36c22bcf..7f4e11f3 100644 --- a/packages/services/src/lib.rs +++ b/packages/services/src/lib.rs @@ -1,6 +1,7 @@ mod block_bundler; mod block_committer; mod block_importer; +mod cost_reporter; mod health_reporter; mod state_committer; mod state_listener; @@ -12,6 +13,7 @@ pub use block_bundler::{ }; pub use block_committer::BlockCommitter; pub use block_importer::BlockImporter; +pub use cost_reporter::CostReporter; pub use health_reporter::HealthReporter; pub use state_committer::{Config as StateCommitterConfig, StateCommitter}; pub use state_listener::StateListener; @@ -247,6 +249,8 @@ pub(crate) mod test_utils { Ok(Some(TransactionResponse::new( height, matches!(status, TxStatus::Success), + 100, + 100, ))) }) }); @@ -279,6 +283,8 @@ pub(crate) mod test_utils { Ok(Some(TransactionResponse::new( height.into(), matches!(status, TxStatus::Success), + 100, + 100, ))) }) }); @@ -313,6 +319,8 @@ pub(crate) mod test_utils { Ok(Some(TransactionResponse::new( height.into(), matches!(status, TxStatus::Success), + 100, + 100, ))) }) }); diff --git a/packages/services/src/state_listener.rs b/packages/services/src/state_listener.rs index c45123f9..cba42d4b 100644 --- a/packages/services/src/state_listener.rs +++ b/packages/services/src/state_listener.rs @@ -7,7 +7,7 @@ use metrics::{ use ports::{ clock::Clock, storage::Storage, - types::{L1Tx, TransactionState}, + types::{L1Tx, TransactionCostUpdate, TransactionState}, }; use tracing::info; @@ -54,6 +54,8 @@ where let mut selective_change = vec![]; let mut noncewide_changes = vec![]; + let mut cost_per_tx = vec![]; + for tx in non_finalized_txs { if skip_nonces.contains(&tx.nonce) { continue; @@ -120,6 +122,11 @@ where // st tx to finalized and all txs with the same nonce to failed let now = self.clock.now(); noncewide_changes.push((tx.hash, tx.nonce, TransactionState::Finalized(now))); + cost_per_tx.push(TransactionCostUpdate { + tx_hash: tx.hash, + total_fee: tx_response.total_fee(), + da_block_height: tx_response.block_number(), + }); self.metrics.last_finalization_time.set(now.timestamp()); @@ -137,7 +144,7 @@ where .collect(); self.storage - .batch_update_tx_states(selective_change, noncewide_changes) + .update_tx_states_and_costs(selective_change, noncewide_changes, cost_per_tx) .await?; Ok(()) @@ -449,7 +456,9 @@ mod tests { mock.expect_get_transaction_response() .once() .with(eq(tx_hash)) - .return_once(|_| Box::pin(async { Ok(Some(TransactionResponse::new(1, true))) })); + .return_once(|_| { + Box::pin(async { Ok(Some(TransactionResponse::new(1, true, 100, 10))) }) + }); mock.expect_get_block_number() .returning(|| Box::pin(async { Ok(L1Height::from(1u32)) })); @@ -659,7 +668,14 @@ mod tests { .with(eq(replacement_tx_hash)) .once() .return_once(move |_| { - Box::pin(async move { Ok(Some(TransactionResponse::new(current_height, true))) }) + Box::pin(async move { + Ok(Some(TransactionResponse::new( + current_height, + true, + 100, + 10, + ))) + }) }); let mut listener = StateListener::new( @@ -750,6 +766,8 @@ mod tests { Ok(Some(TransactionResponse::new( current_height - blocks_to_finalize, replacement_tx_succeeded, + 100, + 10, ))) }) }); diff --git a/packages/storage/migrations/0007_cost_tracking.sql b/packages/storage/migrations/0007_cost_tracking.sql new file mode 100644 index 00000000..96bc4bbd --- /dev/null +++ b/packages/storage/migrations/0007_cost_tracking.sql @@ -0,0 +1,41 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS bundle_cost ( + bundle_id INTEGER PRIMARY KEY REFERENCES bundles(id), + da_block_height BIGINT NOT NULL, -- DA block height of the last transaction in the bundle + cost NUMERIC(39, 0) NOT NULL, + size BIGINT NOT NULL, + is_finalized BOOLEAN NOT NULL +); + +ALTER TABLE bundle_cost + ADD CONSTRAINT bundle_cost_da_block_height_check + CHECK ( + da_block_height >= 0 +); + +ALTER TABLE bundle_cost + ADD CONSTRAINT bundle_cost_cost_check + CHECK ( + cost >= 0 +); + +ALTER TABLE bundle_cost + ADD CONSTRAINT bundle_cost_size_check + CHECK ( + size >= 0 +); + +ALTER TABLE l1_blob_transaction + DROP CONSTRAINT l1_blob_transaction_state_check; + +ALTER TABLE l1_blob_transaction + ADD CONSTRAINT l1_blob_transaction_state_check + CHECK ( + state IN (0, 1, 2, 3, 4) + AND (state != 1 OR finalized_at IS NOT NULL) +); + +CREATE INDEX idx_bundles_start_height ON bundles(start_height); + +COMMIT; \ No newline at end of file diff --git a/packages/storage/src/lib.rs b/packages/storage/src/lib.rs index a0c63f94..3510d172 100644 --- a/packages/storage/src/lib.rs +++ b/packages/storage/src/lib.rs @@ -12,8 +12,8 @@ mod postgres; use ports::{ storage::{BundleFragment, Result, SequentialFuelBlocks, Storage}, types::{ - BlockSubmission, BlockSubmissionTx, CompressedFuelBlock, DateTime, Fragment, L1Tx, - NonEmpty, NonNegative, TransactionState, Utc, + BlockSubmission, BlockSubmissionTx, BundleCost, CompressedFuelBlock, DateTime, Fragment, + L1Tx, NonEmpty, NonNegative, TransactionCostUpdate, TransactionState, Utc, }, }; pub use postgres::{DbConfig, Postgres}; @@ -134,15 +134,24 @@ impl Storage for Postgres { Ok(self._has_nonfinalized_txs().await?) } - async fn batch_update_tx_states( + async fn update_tx_states_and_costs( &self, selective_changes: Vec<([u8; 32], TransactionState)>, noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + cost_per_tx: Vec, ) -> Result<()> { Ok(self - ._batch_update_tx_states(selective_changes, noncewide_changes) + ._update_tx_states_and_costs(selective_changes, noncewide_changes, cost_per_tx) .await?) } + + async fn get_finalized_costs( + &self, + from_block_height: u32, + limit: usize, + ) -> Result> { + Ok(self._get_finalized_costs(from_block_height, limit).await?) + } } #[cfg(test)] @@ -182,22 +191,23 @@ mod tests { async fn ensure_some_fragments_exists_in_the_db( storage: impl Storage, + range: RangeInclusive, ) -> NonEmpty> { let next_id = storage.next_bundle_id().await.unwrap(); storage .insert_bundle_and_fragments( next_id, - 0..=0, + range, nonempty!( Fragment { data: nonempty![0], - unused_bytes: 1000, - total_bytes: 100.try_into().unwrap() + unused_bytes: 100, + total_bytes: 1000.try_into().unwrap() }, Fragment { data: nonempty![1], - unused_bytes: 1000, - total_bytes: 100.try_into().unwrap() + unused_bytes: 100, + total_bytes: 1000.try_into().unwrap() } ), ) @@ -392,7 +402,7 @@ mod tests { // given let storage = start_db().await; - let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage).await; + let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage, 0..=0).await; let tx = L1Tx { hash: rand::random::<[u8; 32]>(), ..Default::default() @@ -407,7 +417,7 @@ mod tests { // when let changes = vec![(hash, nonce, TransactionState::Finalized(finalization_time))]; storage - .batch_update_tx_states(vec![], changes) + .update_tx_states_and_costs(vec![], changes, vec![]) .await .unwrap(); @@ -755,7 +765,7 @@ mod tests { // given let storage = start_db().await; - let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage).await; + let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage, 0..=0).await; let hash = rand::random::<[u8; 32]>(); let tx = L1Tx { hash, @@ -777,7 +787,7 @@ mod tests { // given let storage = start_db().await; - let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage).await; + let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage, 0..=0).await; let (fragment_1, fragment_2) = (fragment_ids[0], fragment_ids[1]); let inserted_1 = L1Tx { hash: rand::random::<[u8; 32]>(), @@ -808,4 +818,191 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn can_update_costs() -> Result<()> { + // given + let storage = start_db().await; + + let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage, 0..=0).await; + let tx = L1Tx { + hash: rand::random::<[u8; 32]>(), + ..Default::default() + }; + let hash = tx.hash; + let nonce = tx.nonce; + + storage.record_pending_tx(tx, fragment_ids).await.unwrap(); + + let finalization_time = Utc::now(); + + // when + let changes = vec![(hash, nonce, TransactionState::Finalized(finalization_time))]; + let cost_per_tx = TransactionCostUpdate { + tx_hash: hash, + total_fee: 1000u128, + da_block_height: 5000u64, + }; + storage + .update_tx_states_and_costs(vec![], changes, vec![cost_per_tx.clone()]) + .await + .unwrap(); + + // then + let bundle_cost = storage.get_finalized_costs(0, 10).await?; + + assert_eq!(bundle_cost.len(), 1); + assert_eq!(bundle_cost[0].cost, cost_per_tx.total_fee); + assert_eq!(bundle_cost[0].da_block_height, cost_per_tx.da_block_height); + + Ok(()) + } + + async fn ensure_fragments_have_transaction( + storage: &impl Storage, + fragment_ids: NonEmpty>, + state: TransactionState, + ) -> [u8; 32] { + let tx_hash = rand::random::<[u8; 32]>(); + let tx = L1Tx { + hash: tx_hash, + nonce: rand::random(), + ..Default::default() + }; + storage + .record_pending_tx(tx.clone(), fragment_ids) + .await + .unwrap(); + + let changes = vec![(tx.hash, tx.nonce, state)]; + storage + .update_tx_states_and_costs(vec![], changes, vec![]) + .await + .expect("tx state should update"); + + tx_hash + } + + async fn ensure_finalized_fragments_exist_in_the_db( + storage: &impl Storage, + range: RangeInclusive, + total_fee: u128, + da_block_height: u64, + ) -> NonEmpty> { + let fragment_in_db = ensure_some_fragments_exists_in_the_db(storage, range).await; + + let state = TransactionState::Finalized(Utc::now()); + let tx_hash = + ensure_fragments_have_transaction(storage, fragment_in_db.clone(), state).await; + + let cost_per_tx = TransactionCostUpdate { + tx_hash, + total_fee, + da_block_height, + }; + storage + .update_tx_states_and_costs(vec![], vec![], vec![cost_per_tx]) + .await + .expect("cost update shouldn't fail"); + + fragment_in_db + } + + #[tokio::test] + async fn costs_returned_only_for_finalized_bundles() { + // given + let storage = start_db().await; + let cost = 1000u128; + let da_height = 5000u64; + let bundle_range = 1..=2; + + ensure_finalized_fragments_exist_in_the_db(&storage, bundle_range.clone(), cost, da_height) + .await; + + // add submitted and unsubmitted fragments + let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage, 3..=5).await; + ensure_fragments_have_transaction( + &storage, + fragment_ids, + TransactionState::IncludedInBlock, + ) + .await; + ensure_some_fragments_exists_in_the_db(&storage, 6..=10).await; + + // when + let costs = storage.get_finalized_costs(0, 10).await.unwrap(); + + // then + assert_eq!(costs.len(), 1); + + let bundle_cost = &costs[0]; + assert_eq!(bundle_cost.start_height, *bundle_range.start() as u64); + assert_eq!(bundle_cost.end_height, *bundle_range.end() as u64); + assert_eq!(bundle_cost.cost, cost); + assert_eq!(bundle_cost.da_block_height, da_height); + } + + #[tokio::test] + async fn costs_returned_only_for_finalized_with_replacement_txs() { + // given + let storage = start_db().await; + let cost = 1000u128; + let da_height = 5000u64; + let bundle_range = 1..=2; + + let fragment_ids = ensure_finalized_fragments_exist_in_the_db( + &storage, + bundle_range.clone(), + cost, + da_height, + ) + .await; + // simulate replaced txs + ensure_fragments_have_transaction(&storage, fragment_ids, TransactionState::SqueezedOut) + .await; + ensure_some_fragments_exists_in_the_db(&storage, 6..=10).await; + + // when + let costs = storage.get_finalized_costs(0, 10).await.unwrap(); + + // then + assert_eq!(costs.len(), 1); + + let bundle_cost = &costs[0]; + assert_eq!(bundle_cost.start_height, *bundle_range.start() as u64); + assert_eq!(bundle_cost.end_height, *bundle_range.end() as u64); + assert_eq!(bundle_cost.cost, cost); + assert_eq!(bundle_cost.da_block_height, da_height); + } + + #[tokio::test] + async fn respects_from_block_height_and_limit_in_get_finalized_costs() -> Result<()> { + // given + let storage = start_db().await; + + for i in 0..5 { + let start_height = i * 10 + 1; + let end_height = start_height + 9; + let block_range = start_height..=end_height; + + ensure_finalized_fragments_exist_in_the_db(&storage, block_range, 1000u128, 5000u64) + .await; + } + + // when + let from_block_height = 21; + let limit = 2; + let finalized_costs = storage + .get_finalized_costs(from_block_height, limit) + .await?; + + // then + assert_eq!(finalized_costs.len(), 2); + + for bc in &finalized_costs { + assert!(bc.start_height >= from_block_height as u64); + } + + Ok(()) + } } diff --git a/packages/storage/src/mappings/tables.rs b/packages/storage/src/mappings/tables.rs index 75f56372..503de05a 100644 --- a/packages/storage/src/mappings/tables.rs +++ b/packages/storage/src/mappings/tables.rs @@ -100,7 +100,7 @@ impl L1FuelBlockSubmissionTx { } // Assumes that the BigDecimal is non-negative and has no fractional part -fn bigdecimal_to_u128(value: BigDecimal) -> Result { +pub(crate) fn bigdecimal_to_u128(value: BigDecimal) -> Result { let (digits, scale) = value.clone().into_bigint_and_exponent(); if scale > 0 { @@ -118,7 +118,7 @@ fn bigdecimal_to_u128(value: BigDecimal) -> Result { Ok(result) } -fn u128_to_bigdecimal(value: u128) -> BigDecimal { +pub(crate) fn u128_to_bigdecimal(value: u128) -> BigDecimal { let digits = BigInt::from(value); BigDecimal::new(digits, 0) } @@ -399,8 +399,9 @@ impl TryFrom for ports::types::L1Tx { pub enum L1TxState { Pending, Finalized, - Failed, + SqueezedOut, IncludedInBlock, + Failed, } impl From for i16 { @@ -410,6 +411,7 @@ impl From for i16 { L1TxState::Finalized => 1, L1TxState::Failed => 2, L1TxState::IncludedInBlock => 3, + L1TxState::SqueezedOut => 4, } } } @@ -421,6 +423,62 @@ impl From<&TransactionState> for L1TxState { TransactionState::IncludedInBlock => Self::IncludedInBlock, TransactionState::Finalized(_) => Self::Finalized, TransactionState::Failed => Self::Failed, + TransactionState::SqueezedOut => Self::SqueezedOut, } } } + +#[derive(sqlx::FromRow)] +pub struct BundleCost { + pub bundle_id: i32, + pub cost: BigDecimal, + pub size: i64, + pub da_block_height: i64, + pub start_height: i64, + pub end_height: i64, + pub is_finalized: bool, +} + +impl TryFrom for ports::types::BundleCost { + type Error = crate::error::Error; + + fn try_from(value: BundleCost) -> Result { + let cost = bigdecimal_to_u128(value.cost)?; + + let size = value.size.try_into().map_err(|e| { + crate::error::Error::Conversion(format!( + "Invalid db `size` ({}). Reason: {e}", + value.size + )) + })?; + + let da_block_height = value.da_block_height.try_into().map_err(|e| { + crate::error::Error::Conversion(format!( + "Invalid db `da_block_height` ({}). Reason: {e}", + value.da_block_height + )) + })?; + + let start_height = value.start_height.try_into().map_err(|e| { + crate::error::Error::Conversion(format!( + "Invalid db `start_height` ({}). Reason: {e}", + value.start_height + )) + })?; + + let end_height = value.end_height.try_into().map_err(|e| { + crate::error::Error::Conversion(format!( + "Invalid db `end_height` ({}). Reason: {e}", + value.end_height + )) + })?; + + Ok(Self { + cost, + size, + da_block_height, + start_height, + end_height, + }) + } +} diff --git a/packages/storage/src/postgres.rs b/packages/storage/src/postgres.rs index f77a3c9e..cd7661f7 100644 --- a/packages/storage/src/postgres.rs +++ b/packages/storage/src/postgres.rs @@ -1,17 +1,18 @@ -use std::ops::RangeInclusive; +use std::{collections::HashMap, ops::RangeInclusive}; +use crate::postgres::tables::u128_to_bigdecimal; use itertools::Itertools; use metrics::{prometheus::IntGauge, RegistersMetrics}; use ports::{ storage::SequentialFuelBlocks, types::{ - BlockSubmission, BlockSubmissionTx, CompressedFuelBlock, DateTime, Fragment, NonEmpty, - NonNegative, TransactionState, TryCollectNonEmpty, Utc, + BlockSubmission, BlockSubmissionTx, BundleCost, CompressedFuelBlock, DateTime, Fragment, + NonEmpty, NonNegative, TransactionCostUpdate, TransactionState, TryCollectNonEmpty, Utc, }, }; use sqlx::{ postgres::{PgConnectOptions, PgPoolOptions}, - QueryBuilder, + PgConnection, QueryBuilder, }; use super::error::{Error, Result}; @@ -59,6 +60,14 @@ impl RegistersMetrics for Postgres { } } +struct BundleCostUpdate { + cost_contribution: u128, + size_contribution: u64, + latest_da_block_height: u64, +} + +type BundleCostUpdates = HashMap; + #[derive(Debug, Clone, serde::Deserialize)] pub struct DbConfig { /// The hostname or IP address of the `PostgreSQL` server. @@ -616,68 +625,253 @@ impl Postgres { Ok(()) } - pub(crate) async fn _batch_update_tx_states( + pub(crate) async fn _update_tx_states_and_costs( &self, selective_changes: Vec<([u8; 32], TransactionState)>, noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + cost_per_tx: Vec, ) -> Result<()> { let mut tx = self.connection_pool.begin().await?; - for change in selective_changes { - let hash = change.0; - let state = change.1; + self.update_transaction_states(&mut tx, &selective_changes, &noncewide_changes) + .await?; - let finalized_at = match &state { - TransactionState::Finalized(date_time) => Some(*date_time), - _ => None, - }; - let state = i16::from(L1TxState::from(&state)); + self.update_costs(&mut tx, &cost_per_tx).await?; - sqlx::query!( - "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE hash = $3", - state, - finalized_at, - hash.as_slice(), - ) - .execute(&mut *tx) - .await?; + tx.commit().await?; + + Ok(()) + } + + async fn update_transaction_states( + &self, + tx: &mut PgConnection, + selective_changes: &[([u8; 32], TransactionState)], + noncewide_changes: &[([u8; 32], u32, TransactionState)], + ) -> Result<()> { + for (hash, state) in selective_changes { + self.update_transaction_state(tx, hash, state).await?; } - for change in noncewide_changes { - let hash = change.0; - let nonce = change.1; - let state = change.2; + for (hash, nonce, state) in noncewide_changes { + self.update_transactions_noncewide(tx, hash, *nonce, state) + .await?; + } - let finalized_at = match &state { - TransactionState::Finalized(date_time) => Some(*date_time), - _ => None, - }; - let state = i16::from(L1TxState::from(&state)); + Ok(()) + } - sqlx::query!( - "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE nonce = $3", - i16::from(L1TxState::Failed), - Option::>::None, - i64::from(nonce), - ) - .execute(&mut *tx) - .await?; + async fn update_transaction_state( + &self, + tx: &mut PgConnection, + hash: &[u8; 32], + state: &TransactionState, + ) -> Result<()> { + let finalized_at = match state { + TransactionState::Finalized(date_time) => Some(*date_time), + _ => None, + }; + let state_int = i16::from(L1TxState::from(state)); - sqlx::query!( - "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE hash = $3", - state, - finalized_at, - hash.as_slice(), + sqlx::query!( + "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE hash = $3", + state_int, + finalized_at, + hash.as_slice(), + ) + .execute(tx) + .await?; + + Ok(()) + } + + async fn update_transactions_noncewide( + &self, + tx: &mut PgConnection, + hash: &[u8; 32], + nonce: u32, + state: &TransactionState, + ) -> Result<()> { + let finalized_at = match state { + TransactionState::Finalized(date_time) => Some(*date_time), + _ => None, + }; + let state_int = i16::from(L1TxState::from(state)); + + // set all transactions with the same nonce to Failed + sqlx::query!( + "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE nonce = $3", + i16::from(L1TxState::Failed), + Option::>::None, + nonce as i64, + ) + .execute(&mut *tx) + .await?; + + // update the specific transaction + sqlx::query!( + "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE hash = $3", + state_int, + finalized_at, + hash.as_slice(), + ) + .execute(tx) + .await?; + + Ok(()) + } + + async fn update_costs( + &self, + tx: &mut PgConnection, + cost_per_tx: &[TransactionCostUpdate], + ) -> Result<()> { + let bundle_updates = self.process_cost_updates(tx, cost_per_tx).await?; + + for (bundle_id, update) in bundle_updates { + self.update_bundle_cost(tx, bundle_id, &update).await?; + } + + Ok(()) + } + + async fn process_cost_updates( + &self, + tx: &mut PgConnection, + cost_per_tx: &[TransactionCostUpdate], + ) -> Result { + let mut bundle_updates: BundleCostUpdates = HashMap::new(); + + for TransactionCostUpdate { + tx_hash, + total_fee, + da_block_height, + } in cost_per_tx + { + let row = sqlx::query!( + r#" + SELECT + f.bundle_id, + SUM(f.total_bytes)::BIGINT AS total_bytes, + SUM(f.unused_bytes)::BIGINT AS unused_bytes + FROM + l1_blob_transaction t + JOIN l1_transaction_fragments tf ON t.id = tf.transaction_id + JOIN l1_fragments f ON tf.fragment_id = f.id + WHERE + t.hash = $1 + GROUP BY + f.bundle_id + "#, + tx_hash.as_slice() ) - .execute(&mut *tx) + .fetch_one(&mut *tx) .await?; + + let bundle_id = row.bundle_id; + let total_bytes: i64 = row.total_bytes.unwrap_or(0); + let unused_bytes: i64 = row.unused_bytes.unwrap_or(0); + let size_contribution = total_bytes.saturating_sub(unused_bytes) as u64; + + let entry = bundle_updates.entry(bundle_id).or_insert(BundleCostUpdate { + cost_contribution: 0, + size_contribution: 0, + latest_da_block_height: 0, + }); + + entry.cost_contribution = entry.cost_contribution.saturating_add(*total_fee); + entry.size_contribution = entry.size_contribution.saturating_add(size_contribution); + // Update with the latest da_block_height + entry.latest_da_block_height = *da_block_height; } - tx.commit().await?; + Ok(bundle_updates) + } + + async fn update_bundle_cost( + &self, + tx: &mut PgConnection, + bundle_id: i32, + update: &BundleCostUpdate, + ) -> Result<()> { + // Check if any fragment in the bundle is not associated with a finalized transaction + let is_finalized = sqlx::query_scalar!( + r#" + SELECT COUNT(*) = 0 AS "is_finalized!" + FROM l1_fragments f + WHERE f.bundle_id = $1 AND NOT EXISTS ( + SELECT 1 + FROM l1_transaction_fragments tf + JOIN l1_blob_transaction t ON tf.transaction_id = t.id + WHERE tf.fragment_id = f.id AND t.state = $2 + ) + "#, + bundle_id, + i16::from(L1TxState::Finalized), + ) + .fetch_one(&mut *tx) + .await?; + + sqlx::query!( + r#" + INSERT INTO bundle_cost ( + bundle_id, cost, size, da_block_height, is_finalized + ) VALUES ( + $1, $2, $3, $4, $5 + ) + ON CONFLICT (bundle_id) DO UPDATE SET + cost = bundle_cost.cost + EXCLUDED.cost, + size = bundle_cost.size + EXCLUDED.size, + da_block_height = EXCLUDED.da_block_height, + is_finalized = EXCLUDED.is_finalized + "#, + bundle_id, + u128_to_bigdecimal(update.cost_contribution), + i64::try_from(update.size_contribution).unwrap(), + i64::try_from(update.latest_da_block_height).unwrap(), + is_finalized, + ) + .execute(&mut *tx) + .await?; Ok(()) } + pub(crate) async fn _get_finalized_costs( + &self, + from_block_height: u32, + limit: usize, + ) -> Result> { + sqlx::query_as!( + tables::BundleCost, + r#" + SELECT + bc.bundle_id, + bc.cost, + bc.size, + bc.da_block_height, + bc.is_finalized, + b.start_height, + b.end_height + FROM + bundle_cost bc + JOIN bundles b ON bc.bundle_id = b.id + WHERE + b.start_height >= $1 AND bc.is_finalized = TRUE + ORDER BY + b.start_height ASC + LIMIT $2 + "#, + from_block_height as i64, + limit as i64 + ) + .fetch_all(&self.connection_pool) + .await? + .into_iter() + .map(BundleCost::try_from) + .collect::>>() + } + pub(crate) async fn _next_bundle_id(&self) -> Result> { let next_id = sqlx::query!("SELECT nextval(pg_get_serial_sequence('bundles', 'id'))") .fetch_one(&self.connection_pool) @@ -814,9 +1008,18 @@ mod tests { use std::{env, fs, path::Path}; use sqlx::{Executor, PgPool, Row}; + use tokio::time::Instant; use crate::test_instance; + use super::*; + + use ports::{ + storage::Storage, + types::{CollectNonEmpty, Fragment, L1Tx, TransactionState}, + }; + use rand::Rng; + #[tokio::test] async fn test_second_migration_applies_successfully() { let db = test_instance::PostgresProcess::shared() @@ -1075,4 +1278,95 @@ mod tests { let new_fragment_id: i32 = row.try_get("id").unwrap(); assert!(new_fragment_id > 0, "Failed to insert a valid fragment"); } + + #[tokio::test] + async fn stress_test_update_costs() -> Result<()> { + let mut rng = rand::thread_rng(); + + let storage = test_instance::PostgresProcess::shared() + .await + .expect("Failed to initialize PostgresProcess") + .create_random_db() + .await + .expect("Failed to create random test database"); + + let fragments_per_bundle = 1_000_000; + let txs_per_fragment = 100; + + // insert the bundle and fragments + let bundle_id = storage.next_bundle_id().await.unwrap(); + let end_height = rng.gen_range(1..5000); + let range = 0..=end_height; + + // create fragments for the bundle + let fragments = (0..fragments_per_bundle) + .map(|_| Fragment { + data: NonEmpty::from_vec(vec![rng.gen()]).unwrap(), + unused_bytes: rng.gen_range(0..1000), + total_bytes: rng.gen_range(1000..5000).try_into().unwrap(), + }) + .collect::>(); + let fragments = NonEmpty::from_vec(fragments).unwrap(); + + storage + .insert_bundle_and_fragments(bundle_id, range, fragments.clone()) + .await + .unwrap(); + + let fragment_ids = storage + .oldest_nonfinalized_fragments(0, 2) + .await + .unwrap() + .into_iter() + .map(|f| f.id) + .collect_nonempty() + .unwrap(); + + let mut tx_changes = vec![]; + let mut cost_updates = vec![]; + + // for each fragment, create multiple transactions + for _id in fragment_ids.iter() { + for _ in 0..txs_per_fragment { + let tx_hash = rng.gen::<[u8; 32]>(); + let tx = L1Tx { + hash: tx_hash, + nonce: rng.gen(), + ..Default::default() + }; + + storage + .record_pending_tx(tx.clone(), fragment_ids.clone()) + .await + .unwrap(); + + // update transaction state to simulate finalized transactions + let finalization_time = Utc::now(); + tx_changes.push((tx.hash, TransactionState::Finalized(finalization_time))); + + // cost updates + let total_fee = rng.gen_range(1_000_000u128..10_000_000u128); + let da_block_height = rng.gen_range(1_000_000u64..10_000_000u64); + cost_updates.push(TransactionCostUpdate { + tx_hash, + total_fee, + da_block_height, + }); + } + } + + // update transaction states and costs + let start_time = Instant::now(); + + storage + .update_tx_states_and_costs(tx_changes, vec![], cost_updates) + .await + .unwrap(); + + let duration = start_time.elapsed(); + + assert!(duration.as_secs() < 60); + + Ok(()) + } } diff --git a/packages/storage/src/test_instance.rs b/packages/storage/src/test_instance.rs index 9682200e..0712cc10 100644 --- a/packages/storage/src/test_instance.rs +++ b/packages/storage/src/test_instance.rs @@ -8,8 +8,8 @@ use delegate::delegate; use ports::{ storage::{BundleFragment, SequentialFuelBlocks, Storage}, types::{ - BlockSubmission, BlockSubmissionTx, CompressedFuelBlock, DateTime, Fragment, L1Tx, - NonEmpty, NonNegative, TransactionState, Utc, + BlockSubmission, BlockSubmissionTx, BundleCost, CompressedFuelBlock, DateTime, Fragment, + L1Tx, NonEmpty, NonNegative, TransactionCostUpdate, TransactionState, Utc, }, }; use sqlx::Executor; @@ -206,11 +206,13 @@ impl Storage for DbWithProcess { ) -> ports::storage::Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> ports::storage::Result>; async fn last_time_a_fragment_was_finalized(&self) -> ports::storage::Result>>; - async fn batch_update_tx_states( + async fn update_tx_states_and_costs( &self, selective_changes: Vec<([u8; 32], TransactionState)>, noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + cost_per_tx: Vec, ) -> ports::storage::Result<()>; + async fn get_finalized_costs(&self, from_block_height: u32, limit: usize) -> ports::storage::Result>; } } }