Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txpool api: remove_invalid call improved #6661

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/bin/node/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ kitchensink-runtime = { workspace = true }
sc-client-api = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
sp-state-machine = { workspace = true, default-features = true }
sp-blockchain = { workspace = true, default-features = true }
serde = { workspace = true, default-features = true }
serde_json = { workspace = true, default-features = true }
derive_more = { features = ["display"], workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion substrate/bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
unimplemented!()
}

fn remove_invalid(&self, _hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
fn report_invalid(
&self,
_at: Option<Self::Hash>,
_invalid_tx_errors: &[(TxHash<Self>, Option<sp_blockchain::Error>)],
) -> Vec<Arc<Self::InPoolTransaction>> {
Default::default()
}

Expand Down
4 changes: 2 additions & 2 deletions substrate/client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ where
target: LOG_TARGET,
"[{:?}] Invalid transaction: {} at: {}", pending_tx_hash, e, self.parent_hash
);
unqueue_invalid.push(pending_tx_hash);
unqueue_invalid.push((pending_tx_hash, Some(e)));
},
}
};
Expand All @@ -524,7 +524,7 @@ where
);
}

self.transaction_pool.remove_invalid(&unqueue_invalid);
self.transaction_pool.report_invalid(Some(self.parent_hash), &unqueue_invalid);
Ok(end_reason)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,12 @@ impl TransactionPool for MiddlewarePool {
Ok(watcher.boxed())
}

fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
self.inner_pool.remove_invalid(hashes)
fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: &[(TxHash<Self>, Option<sp_blockchain::Error>)],
) -> Vec<Arc<Self::InPoolTransaction>> {
self.inner_pool.report_invalid(at, invalid_tx_errors)
}

fn status(&self) -> PoolStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ where
}

// Best effort pool removal (tx can already be finalized).
pool.remove_invalid(&[broadcast_state.tx_hash]);
pool.report_invalid(None, &[(broadcast_state.tx_hash, None)]);
});

// Keep track of this entry and the abortable handle.
Expand Down
6 changes: 3 additions & 3 deletions substrate/client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ where
let hashes = bytes_or_hash
.into_iter()
.map(|x| match x {
hash::ExtrinsicOrHash::Hash(h) => Ok(h),
hash::ExtrinsicOrHash::Hash(h) => Ok((h, None)),
hash::ExtrinsicOrHash::Extrinsic(bytes) => {
let xt = Decode::decode(&mut &bytes[..])?;
Ok(self.pool.hash_of(&xt))
Ok((self.pool.hash_of(&xt), None))
},
})
.collect::<Result<Vec<_>>>()?;

Ok(self
.pool
.remove_invalid(&hashes)
.report_invalid(None, &hashes)
.into_iter()
.map(|tx| tx.hash().clone())
.collect())
Expand Down
20 changes: 18 additions & 2 deletions substrate/client/transaction-pool/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,24 @@ pub trait TransactionPool: Send + Sync {
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;

// *** Block production
/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>;
/// Reports invalid transactions to the transaction pool.
///
/// This function accepts an array of tuples, each containing a transaction hash and an
/// optional error encountered during the transaction execution at a specific (also optional)
/// block.
///
/// The transaction pool implementation decides which transactions to remove. Transactions
/// dependent on invalid ones will also be removed.
///
/// If the tuple's error is None, the transaction will be forcibly removed from the pool.
///
/// The optional `at` parameter provides additional context regarding the block where the error
/// occurred.
fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: &[(TxHash<Self>, Option<sp_blockchain::Error>)],
) -> Vec<Arc<Self::InPoolTransaction>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also keep old function remove_invalid and add new one, with no optional error/block parameters. Really no strong opinion here.

I also think that triggering invalid event in case of removal the transaction via RPC call is (maybe) undesirable - so maybe we should have pure remove function (which would trigger Dropped, IMO it makes more sense). Any thoughts?


// *** logging
/// Get futures transaction list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,18 +675,49 @@ where
.inspect_err(|_| mempool.remove(xt_hash))
}

/// Intended to remove transactions identified by the given hashes, and any dependent
/// transactions, from the pool. In current implementation this function only outputs the error.
/// Seems that API change is needed here to make this call reasonable.
// todo [#5491]: api change? we need block hash here (assuming we need it at all - could be
// useful for verification for debugging purposes).
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
if !hashes.is_empty() {
log::debug!(target: LOG_TARGET, "fatp::remove_invalid {}", hashes.len());
log_xt_trace!(target:LOG_TARGET, hashes, "[{:?}] fatp::remove_invalid");
self.metrics
.report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _));
}
// /// Intended to remove transactions identified by the given hashes, and any dependent
// /// transactions, from the pool. In current implementation this function only outputs the
// error. /// Seems that API change is needed here to make this call reasonable.
// // todo [#5491]: api change? we need block hash here (assuming we need it at all - could be
// // useful for verification for debugging purposes).
// fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
// if !hashes.is_empty() {
// log::debug!(target: LOG_TARGET, "fatp::remove_invalid {}", hashes.len());
// log_xt_trace!(target:LOG_TARGET, hashes, "[{:?}] fatp::remove_invalid");
// self.metrics
// .report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _));
// }
// Default::default()
// }

/// Reports invalid transactions to the transaction pool.
///
/// This function takes an array of tuples, each consisting of a transaction hash and the
/// corresponding error that occurred during transaction execution at given block.
///
/// The transaction pool implementation will determine which transactions should be
/// removed from the pool. Transactions that depend on invalid transactions will also
/// be removed.
fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: &[(TxHash<Self>, Option<sp_blockchain::Error>)],
) -> Vec<Arc<Self::InPoolTransaction>> {
self.metrics
.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));

let removed = self.view_store.report_invalid(at, invalid_tx_errors);

self.metrics
.report(|metrics| metrics.removed_invalid_txs.inc_by(removed.len() as _));

// todo (after merging / rebasing)

// depending on error:
// - handle cloned view with view_store replacements
// - remove resulting hashes from mempool and collect Arc<Transaction>
// - send notification using listener (should this be done in view_store)

Default::default()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct Metrics {
/// Total number of unwatched transactions in txpool.
pub unwatched_txs: Gauge<U64>,
/// Total number of transactions reported as invalid.
pub reported_invalid_txs: Counter<U64>,
/// Total number of transactions removed as invalid.
pub removed_invalid_txs: Counter<U64>,
/// Total number of finalized transactions.
pub finalized_txs: Counter<U64>,
Expand Down Expand Up @@ -99,10 +101,17 @@ impl MetricsRegistrant for Metrics {
)?,
registry,
)?,
reported_invalid_txs: register(
Counter::new(
"substrate_sub_txpool_reported_invalid_txs_total",
"Total number of transactions reported as invalid.",
)?,
registry,
)?,
removed_invalid_txs: register(
Counter::new(
"substrate_sub_txpool_removed_invalid_txs_total",
"Total number of transactions reported as invalid.",
"Total number of transactions removed as invalid.",
)?,
registry,
)?,
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use super::metrics::MetricsLink as PrometheusMetrics;
use crate::{
common::log_xt::log_xt_trace,
graph::{
self, watcher::Watcher, ExtrinsicFor, ExtrinsicHash, IsValidator, ValidatedTransaction,
ValidatedTransactionFor,
self, watcher::Watcher, ExtrinsicFor, ExtrinsicHash, IsValidator, TransactionFor,
ValidatedTransaction, ValidatedTransactionFor,
},
LOG_TARGET,
};
Expand Down Expand Up @@ -455,4 +455,11 @@ where
);
}
}

pub(crate) fn remove_invalid(
&self,
hashes: &[ExtrinsicHash<ChainApi>],
) -> Vec<TransactionFor<ChainApi>> {
self.pool.validated_pool().remove_invalid(hashes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ use futures::prelude::*;
use itertools::Itertools;
use parking_lot::RwLock;
use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus, TransactionSource};
use sp_blockchain::TreeRoute;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
use sp_blockchain::{ApplyExtrinsicFailed, Error as BlockchainError, TreeRoute};
use sp_runtime::{
generic::BlockId,
traits::Block as BlockT,
transaction_validity::{InvalidTransaction, TransactionValidityError},
};
use std::{collections::HashMap, sync::Arc, time::Instant};

/// The helper structure encapsulates all the views.
Expand Down Expand Up @@ -484,4 +488,45 @@ where
futures::future::join_all(finish_revalidation_futures).await;
log::trace!(target:LOG_TARGET,"finish_background_revalidations took {:?}", start.elapsed());
}

pub(crate) fn report_invalid(
&self,
at: Option<Block::Hash>,
invalid_tx_errors: &[(ExtrinsicHash<ChainApi>, Option<BlockchainError>)],
) -> Vec<ExtrinsicHash<ChainApi>> {
let mut remove_from_view = vec![];
let mut remove_from_pool = vec![];

invalid_tx_errors.iter().for_each(|(hash, e)| match e {
Some(BlockchainError::ApplyExtrinsicFailed(ApplyExtrinsicFailed::Validity(
TransactionValidityError::Invalid(
InvalidTransaction::Future | InvalidTransaction::Stale,
),
))) => {
remove_from_view.push(*hash);
},
_ => {
remove_from_pool.push(*hash);
},
});

at.inspect(|at| {
self.get_view_at(*at, true)
.map(|(view, _)| view.remove_invalid(&remove_from_view[..]))
.unwrap_or_default();
});

//todo: duplicated code - we need to remove subtree from every view
// let active_views = self.active_views.read();
// let inactive_views = self.inactive_views.read();
// active_views
// .iter()
// .chain(inactive_views.iter())
// .filter(|(_, view)| view.is_imported(&xt_hash))
// .for_each(|(_, view)| {
// view.remove_subtree(xt_hash, replaced_with);
// });

remove_from_pool
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,13 @@ where
Ok(watcher.into_stream().boxed())
}

fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
let removed = self.pool.validated_pool().remove_invalid(hashes);
fn report_invalid(
&self,
_at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: &[(TxHash<Self>, Option<sp_blockchain::Error>)],
) -> Vec<Arc<Self::InPoolTransaction>> {
let hashes = invalid_tx_errors.iter().map(|(hash, _)| *hash).collect::<Vec<_>>();
let removed = self.pool.validated_pool().remove_invalid(&hashes[..]);
self.metrics
.report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ where
self.0.ready()
}

fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
self.0.remove_invalid(hashes)
fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: &[(TxHash<Self>, Option<sp_blockchain::Error>)],
) -> Vec<Arc<Self::InPoolTransaction>> {
self.0.report_invalid(at, invalid_tx_errors)
}

fn futures(&self) -> Vec<Self::InPoolTransaction> {
Expand Down
Loading