Skip to content

Commit

Permalink
Merge branch 'development' into horizon-sync-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnaveira authored Aug 31, 2023
2 parents f2b0fa0 + b0337cf commit afca199
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

kernel_hashes.push(kernel.hash());

if mmr_position > end {
return Err(HorizonSyncError::IncorrectResponse(format!(
"Peer sent too many kernels",
)));
}

let mmr_position_u32 = u32::try_from(mmr_position).map_err(|_| HorizonSyncError::InvalidMmrPosition {
at_height: current_header.height(),
mmr_position,
Expand Down Expand Up @@ -559,6 +565,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
avg_latency.add_sample(latency);
let res: SyncUtxosResponse = response?;

if mmr_position > end {
return Err(HorizonSyncError::IncorrectResponse(format!(
"Peer sent too many outputs",
)));
}

if res.mmr_index != 0 && res.mmr_index != mmr_position {
return Err(HorizonSyncError::IncorrectResponse(format!(
"Expected MMR position of {} but got {}",
Expand Down Expand Up @@ -737,6 +749,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
last_sync_timer = Instant::now();
}

if !unpruned_outputs.is_empty() {
return Err(HorizonSyncError::IncorrectResponse(
"Sync node sent leftover unpruned outputs".to_string(),
));
}

if mmr_position != end {
return Err(HorizonSyncError::IncorrectResponse(
"Sync node did not send all utxos requested".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ pub enum MempoolError {
BlockingTaskError(#[from] JoinError),
#[error("Internal error: {0}")]
InternalError(String),
#[error("Mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")]
IndexOutOfSync,
}
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Mempool {
&self,
excess_sigs: Vec<PrivateKey>,
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
self.with_read_access(move |storage| Ok(storage.retrieve_by_excess_sigs(&excess_sigs)))
self.with_read_access(move |storage| storage.retrieve_by_excess_sigs(&excess_sigs))
.await
}

Expand Down
19 changes: 12 additions & 7 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,19 @@ impl MempoolStorage {
Ok(results.retrieved_transactions)
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
let (found_txns, remaining) = self.unconfirmed_pool.retrieve_by_excess_sigs(excess_sigs);
let (found_published_transactions, remaining) = self.reorg_pool.retrieve_by_excess_sigs(&remaining);
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
let (found_txns, remaining) = self.unconfirmed_pool.retrieve_by_excess_sigs(excess_sigs)?;

(
found_txns.into_iter().chain(found_published_transactions).collect(),
remaining,
)
match self.reorg_pool.retrieve_by_excess_sigs(&remaining) {
Ok((found_published_transactions, remaining)) => Ok((
found_txns.into_iter().chain(found_published_transactions).collect(),
remaining,
)),
Err(e) => Err(e),
}
}

/// Check if the specified excess signature is found in the Mempool.
Expand Down
20 changes: 9 additions & 11 deletions base_layer/core/src/mempool/reorg_pool/reorg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_utilities::hex::Hex;

use crate::{
blocks::Block,
mempool::shrink_hashmap::shrink_hashmap,
mempool::{shrink_hashmap::shrink_hashmap, MempoolError},
transactions::transaction_components::Transaction,
};

Expand Down Expand Up @@ -144,7 +144,10 @@ impl ReorgPool {
result
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
let mut remaining = Vec::new();
Expand All @@ -158,15 +161,10 @@ impl ReorgPool {

let found = found
.into_iter()
.map(|id| {
self.tx_by_key
.get(id)
.expect("mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")
})
.cloned()
.collect();

(found, remaining)
.map(|id| self.tx_by_key.get(id).cloned().ok_or(MempoolError::IndexOutOfSync))
.collect::<Result<Vec<_>, _>>()?;

Ok((found, remaining))
}

/// Check if a transaction is stored in the ReorgPool
Expand Down
12 changes: 8 additions & 4 deletions base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
shrink_hashmap::shrink_hashmap,
unconfirmed_pool::UnconfirmedPoolError,
FeePerGramStat,
MempoolError,
},
transactions::{tari_amount::MicroMinotari, transaction_components::Transaction, weight::TransactionWeight},
};
Expand Down Expand Up @@ -401,7 +402,10 @@ impl UnconfirmedPool {
}
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
let mut remaining = Vec::new();
Expand All @@ -419,11 +423,11 @@ impl UnconfirmedPool {
self.tx_by_key
.get(&id)
.map(|tx| tx.transaction.clone())
.expect("mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")
.ok_or(MempoolError::IndexOutOfSync)
})
.collect();
.collect::<Result<Vec<_>, _>>()?;

(found, remaining)
Ok((found, remaining))
}

fn get_all_dependent_transactions(
Expand Down

0 comments on commit afca199

Please sign in to comment.