From f2027139474745404150b9633cee0c5f8ea02b19 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 5 Dec 2024 13:33:45 +0200 Subject: [PATCH] improve sync handling --- src/server/p2p/network.rs | 10 ++- src/sharechain/in_memory.rs | 89 +++++++++++++----------- src/sharechain/p2chain.rs | 131 +++++++++++++++++++++++++++++++++++- 3 files changed, 184 insertions(+), 46 deletions(-) diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index 85363eac..b2c7215a 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -1705,9 +1705,13 @@ where S: ShareChain // } // let tx = self.inner_request_tx.clone(); - let i_have_blocks = share_chain - .create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE) - .await; + let i_have_blocks = if last_block_from_them.is_none() { + share_chain + .create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE) + .await + } else { + vec![] + }; info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}", algo, diff --git a/src/sharechain/in_memory.rs b/src/sharechain/in_memory.rs index f63a7ac2..a3f97727 100644 --- a/src/sharechain/in_memory.rs +++ b/src/sharechain/in_memory.rs @@ -332,34 +332,29 @@ impl InMemoryShareChain { }; loop { - for block in level.blocks.values() { - if main_chain_only { - if block.hash == level.chain_block { - for uncle in &block.uncles { - // Always include all the uncles, if we have them - if let Some(uncle_block) = - p2_chain.level_at_height(uncle.0).and_then(|l| l.blocks.get(&uncle.1)) - { - // Uncles should never exist in the main chain, so we don't need to worry about - // duplicates - res.push(uncle_block.clone()); - } + if main_chain_only { + if let Some(block) = level.block_in_main_chain() { + for uncle in &block.uncles { + // Always include all the uncles, if we have them + if let Some(uncle_block) = p2_chain.get_block_at_height(uncle.0, &uncle.1) { + // Uncles should never exist in the main chain, so we don't need to worry about + // duplicates + res.push(uncle_block.clone()); } - - num_actual_blocks += 1; - res.push(block.clone()); } - } else { num_actual_blocks += 1; res.push(block.clone()); } - // Always include at least 2 main chain blocks so that if we called - // this function with the starting mainchain block we can continue asking for more - // blocks - if num_actual_blocks > page_size { - return Ok(res); + } else { + for block in level.blocks.values() { + num_actual_blocks += 1; + res.push(block.clone()); } } + if num_actual_blocks > page_size { + return Ok(res); + } + level = if let Some(new_level) = p2_chain.level_at_height(level.height + 1) { new_level } else { @@ -680,21 +675,17 @@ impl ShareChain for InMemoryShareChain { ) -> Result<(Vec>, Option<(u64, FixedHash)>, AccumulatedDifficulty), ShareChainError> { let p2_chain_read = self.p2_chain.read().await; - // Assume their blocks are in order highest first. let mut split_height = 0; if let Some(last_block_received) = last_block_received { - if let Some(level) = p2_chain_read.level_at_height(last_block_received.0) { - if let Some(block) = level.blocks.get(&last_block_received.1) { - split_height = block.height.saturating_add(1); - } + if let Some(block) = p2_chain_read.get_block_at_height(last_block_received.0, &last_block_received.1) { + split_height = block.height.saturating_add(1); } } let mut their_blocks = their_blocks.to_vec(); // Highest to lowest their_blocks.sort_by(|a, b| b.0.cmp(&a.0)); - // their_blocks.reverse(); let mut split_height2 = 0; // Go back and find the split in the chain @@ -711,7 +702,7 @@ impl ShareChain for InMemoryShareChain { info!(target: LOG_TARGET, "[{:?}] Requesting sync, split_height1 {} splitheight2 {} last block {}", self.pow_algo, split_height, split_height2, last_block_received.as_ref().map(|(h, _)| h.to_string()).unwrap_or("None".to_string())); let blocks = - self.all_blocks_with_lock(&p2_chain_read, Some(cmp::max(split_height, split_height2)), limit, true)?; + self.all_blocks_with_lock(&p2_chain_read, Some(cmp::max(split_height, split_height2)), limit, true)?; // potential issue here, maybe this should be min let tip_level = p2_chain_read .get_tip() .map(|tip_level| (tip_level.height, tip_level.chain_block)); @@ -755,33 +746,51 @@ impl ShareChain for InMemoryShareChain { let mut i_have_blocks = Vec::with_capacity(size); if let Some(tip) = p2_chain_read_lock.get_tip() { let tip_height = tip.height; - let tip_hash = tip.chain_block; let mut height = tip_height; - let mut hash = tip_hash; for _ in 0..size { if let Some(level) = p2_chain_read_lock.level_at_height(height) { - let block = if let Some(block) = level.blocks.get(&hash) { + // if sync requestee only sees their behind on tip, they will fill in fixedhash::zero(), so it + // wont find this hash, so we return the current chain block + let block = if let Some(block) = level.block_in_main_chain() { block.clone() } else { - // if sync requestee only sees their behind on tip, they will fill in fixedhash::zero(), so it - // wont find this hash, so we return the curent chain block - if let Some(block) = level.block_in_main_chain() { - block.clone() - } else { - break; - } + break; }; i_have_blocks.push((height, block.hash)); if height == 0 { break; } - height = block.height - 1; - hash = block.hash; + height = block.height.saturating_sub(1); } else { break; } } + if i_have_blocks.len() < size { + // we have less blocks to fill up the request, not worth filling in older blocks + return i_have_blocks; + } + // lets replace some blocks with older ones so that it does not neet to sync the entire chain + let mut counter = 0; + let mut count_back = 3000; + while count_back > 100 { + let height = match tip_height.checked_sub(count_back) { + Some(h) => h, + None => { + count_back -= 250; + continue; + }, + }; + if let Some(level) = p2_chain_read_lock.level_at_height(height) { + if let Some(block) = level.block_in_main_chain() { + let index = i_have_blocks.len() - counter; + i_have_blocks[index] = (height, block.hash); + counter += 1; + } + } + count_back -= 250; + } } + i_have_blocks } } diff --git a/src/sharechain/p2chain.rs b/src/sharechain/p2chain.rs index cb991661..10691f24 100644 --- a/src/sharechain/p2chain.rs +++ b/src/sharechain/p2chain.rs @@ -157,7 +157,7 @@ impl P2Chain { .get(usize::try_from(index?).expect("32 bit systems not supported")) } - fn get_block_at_height(&self, height: u64, hash: &FixedHash) -> Option<&Arc> { + pub fn get_block_at_height(&self, height: u64, hash: &FixedHash) -> Option<&Arc> { let level = self.level_at_height(height)?; level.blocks.get(hash) } @@ -201,11 +201,11 @@ impl P2Chain { self.levels.len() as u64 >= self.total_size + SAFETY_MARGIN + MAX_EXTRA_SYNC } - fn cleanup_chain(&mut self) -> Result<(), ShareChainError>{ + fn cleanup_chain(&mut self) -> Result<(), ShareChainError> { let mut first_index = self.levels.back().map(|level| level.height).unwrap_or(0); let mut current_chain_length = self.current_tip.saturating_sub(first_index); // let see if we are the limit for the current chain - while current_chain_length > self.total_size + SAFETY_MARGIN{ + while current_chain_length > self.total_size + SAFETY_MARGIN { self.levels.pop_back().ok_or(ShareChainError::BlockLevelNotFound)?; first_index = self.levels.back().map(|level| level.height).unwrap_or(0); current_chain_length = self.current_tip.saturating_sub(first_index); @@ -1368,6 +1368,131 @@ mod test { chain.assert_share_window_verified(); } + #[test] + fn add_blocks_missing_block() { + // this test will verify that we reorg to a completely new chain + let mut chain = P2Chain::new_empty(50, 25, 20); + + let mut prev_block = None; + let mut tari_block = Block::new(BlockHeader::new(0), AggregateBody::empty()); + let mut blocks = Vec::new(); + for i in 0..50 { + tari_block.header.nonce = i; + let address = new_random_address(); + let block = P2BlockBuilder::new(prev_block.as_ref()) + .with_timestamp(EpochTime::now()) + .with_height(i) + .with_tari_block(tari_block.clone()) + .unwrap() + .with_miner_wallet_address(address.clone()) + .with_target_difficulty(Difficulty::from_u64(10).unwrap()) + .unwrap() + .build() + .unwrap(); + prev_block = Some(block.clone()); + blocks.push(block); + } + + for (i, block) in blocks.iter().enumerate().take(50) { + if i != 25 { + chain.add_block_to_chain(block.clone()).unwrap(); + } + } + assert_eq!(chain.current_tip, 24); + chain.add_block_to_chain(blocks[25].clone()).unwrap(); + + assert_eq!(chain.current_tip, 49); + assert_eq!(chain.get_tip().unwrap().chain_block, prev_block.unwrap().hash); + + chain.assert_share_window_verified(); + } + + #[test] + fn reorg_with_missing_uncle() { + // this test will verify that we reorg to a completely new chain + let mut chain = P2Chain::new_empty(50, 25, 20); + + let mut prev_block = None; + let mut tari_block = Block::new(BlockHeader::new(0), AggregateBody::empty()); + for i in 0..50 { + tari_block.header.nonce = i; + let address = new_random_address(); + let block = P2BlockBuilder::new(prev_block.as_ref()) + .with_timestamp(EpochTime::now()) + .with_height(i) + .with_tari_block(tari_block.clone()) + .unwrap() + .with_miner_wallet_address(address.clone()) + .with_target_difficulty(Difficulty::from_u64(10).unwrap()) + .unwrap() + .build() + .unwrap(); + prev_block = Some(block.clone()); + chain.add_block_to_chain(block).unwrap(); + } + + assert_eq!(chain.current_tip, 49); + assert_eq!(chain.get_tip().unwrap().chain_block, prev_block.unwrap().hash); + + let mut prev_block = None; + let mut tari_block = Block::new(BlockHeader::new(0), AggregateBody::empty()); + let mut uncle_parent = None; + let mut uncle_block = None; + for i in 0..50 { + tari_block.header.nonce = i + 100; + let address = new_random_address(); + let uncles = if i == 25 { + let uncle = P2BlockBuilder::new(uncle_parent.as_ref()) + .with_timestamp(EpochTime::now()) + .with_height(24) + .with_tari_block(tari_block.clone()) + .unwrap() + .with_miner_wallet_address(address.clone()) + .build() + .unwrap(); + uncle_block = Some(uncle.clone()); + vec![uncle] + } else { + vec![] + }; + let block = P2BlockBuilder::new(prev_block.as_ref()) + .with_timestamp(EpochTime::now()) + .with_height(i) + .with_tari_block(tari_block.clone()) + .unwrap() + .with_miner_wallet_address(address.clone()) + .with_uncles(&uncles) + .unwrap() + .with_target_difficulty(Difficulty::from_u64(11).unwrap()) + .unwrap() + .build() + .unwrap(); + if i == 23 { + uncle_parent = Some(block.clone()); + } + prev_block = Some(block.clone()); + chain.add_block_to_chain(block).unwrap(); + } + + assert_eq!(chain.current_tip, 49); + let hash = prev_block.unwrap().hash; + assert_ne!(chain.get_tip().unwrap().chain_block, hash); + chain.add_block_to_chain(uncle_block.unwrap()).unwrap(); + assert_eq!(chain.get_tip().unwrap().chain_block, hash); + assert_eq!( + chain + .get_tip() + .unwrap() + .block_in_main_chain() + .unwrap() + .original_header + .nonce, + 149 + ); + + chain.assert_share_window_verified(); + } + #[test] fn add_blocks_to_chain_super_large_reorg_only_window() { // this test will verify that we reorg to a completely new chain