Skip to content

Commit

Permalink
improve sync handling
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Dec 5, 2024
1 parent c3d0651 commit f202713
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 46 deletions.
10 changes: 7 additions & 3 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1705,9 +1705,13 @@ where S: ShareChain
// }

// let tx = self.inner_request_tx.clone();
let i_have_blocks = share_chain
.create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE)
.await;
let i_have_blocks = if last_block_from_them.is_none() {
share_chain
.create_catchup_sync_blocks(CATCH_UP_SYNC_BLOCKS_IN_I_HAVE)
.await
} else {
vec![]
};

info!(target: SYNC_REQUEST_LOG_TARGET, "[{:?}] Sending catch up sync to {} for blocks {}, last block received {}. Their height:{}",
algo,
Expand Down
89 changes: 49 additions & 40 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,34 +332,29 @@ impl InMemoryShareChain {
};

loop {
for block in level.blocks.values() {
if main_chain_only {
if block.hash == level.chain_block {
for uncle in &block.uncles {
// Always include all the uncles, if we have them
if let Some(uncle_block) =
p2_chain.level_at_height(uncle.0).and_then(|l| l.blocks.get(&uncle.1))
{
// Uncles should never exist in the main chain, so we don't need to worry about
// duplicates
res.push(uncle_block.clone());
}
if main_chain_only {
if let Some(block) = level.block_in_main_chain() {
for uncle in &block.uncles {
// Always include all the uncles, if we have them
if let Some(uncle_block) = p2_chain.get_block_at_height(uncle.0, &uncle.1) {
// Uncles should never exist in the main chain, so we don't need to worry about
// duplicates
res.push(uncle_block.clone());
}

num_actual_blocks += 1;
res.push(block.clone());
}
} else {
num_actual_blocks += 1;
res.push(block.clone());
}
// Always include at least 2 main chain blocks so that if we called
// this function with the starting mainchain block we can continue asking for more
// blocks
if num_actual_blocks > page_size {
return Ok(res);
} else {
for block in level.blocks.values() {
num_actual_blocks += 1;
res.push(block.clone());
}
}
if num_actual_blocks > page_size {
return Ok(res);
}

level = if let Some(new_level) = p2_chain.level_at_height(level.height + 1) {
new_level
} else {
Expand Down Expand Up @@ -680,21 +675,17 @@ impl ShareChain for InMemoryShareChain {
) -> Result<(Vec<Arc<P2Block>>, Option<(u64, FixedHash)>, AccumulatedDifficulty), ShareChainError> {
let p2_chain_read = self.p2_chain.read().await;

// Assume their blocks are in order highest first.
let mut split_height = 0;

if let Some(last_block_received) = last_block_received {
if let Some(level) = p2_chain_read.level_at_height(last_block_received.0) {
if let Some(block) = level.blocks.get(&last_block_received.1) {
split_height = block.height.saturating_add(1);
}
if let Some(block) = p2_chain_read.get_block_at_height(last_block_received.0, &last_block_received.1) {
split_height = block.height.saturating_add(1);
}
}

let mut their_blocks = their_blocks.to_vec();
// Highest to lowest
their_blocks.sort_by(|a, b| b.0.cmp(&a.0));
// their_blocks.reverse();

let mut split_height2 = 0;
// Go back and find the split in the chain
Expand All @@ -711,7 +702,7 @@ impl ShareChain for InMemoryShareChain {
info!(target: LOG_TARGET, "[{:?}] Requesting sync, split_height1 {} splitheight2 {} last block {}", self.pow_algo, split_height, split_height2, last_block_received.as_ref().map(|(h, _)| h.to_string()).unwrap_or("None".to_string()));

let blocks =
self.all_blocks_with_lock(&p2_chain_read, Some(cmp::max(split_height, split_height2)), limit, true)?;
self.all_blocks_with_lock(&p2_chain_read, Some(cmp::max(split_height, split_height2)), limit, true)?; // potential issue here, maybe this should be min
let tip_level = p2_chain_read
.get_tip()
.map(|tip_level| (tip_level.height, tip_level.chain_block));
Expand Down Expand Up @@ -755,33 +746,51 @@ impl ShareChain for InMemoryShareChain {
let mut i_have_blocks = Vec::with_capacity(size);
if let Some(tip) = p2_chain_read_lock.get_tip() {
let tip_height = tip.height;
let tip_hash = tip.chain_block;
let mut height = tip_height;
let mut hash = tip_hash;
for _ in 0..size {
if let Some(level) = p2_chain_read_lock.level_at_height(height) {
let block = if let Some(block) = level.blocks.get(&hash) {
// if sync requestee only sees their behind on tip, they will fill in fixedhash::zero(), so it
// wont find this hash, so we return the current chain block
let block = if let Some(block) = level.block_in_main_chain() {
block.clone()
} else {
// if sync requestee only sees their behind on tip, they will fill in fixedhash::zero(), so it
// wont find this hash, so we return the curent chain block
if let Some(block) = level.block_in_main_chain() {
block.clone()
} else {
break;
}
break;
};
i_have_blocks.push((height, block.hash));
if height == 0 {
break;
}
height = block.height - 1;
hash = block.hash;
height = block.height.saturating_sub(1);
} else {
break;
}
}
if i_have_blocks.len() < size {
// we have less blocks to fill up the request, not worth filling in older blocks
return i_have_blocks;
}
// lets replace some blocks with older ones so that it does not neet to sync the entire chain
let mut counter = 0;
let mut count_back = 3000;
while count_back > 100 {
let height = match tip_height.checked_sub(count_back) {
Some(h) => h,
None => {
count_back -= 250;
continue;
},
};
if let Some(level) = p2_chain_read_lock.level_at_height(height) {
if let Some(block) = level.block_in_main_chain() {
let index = i_have_blocks.len() - counter;
i_have_blocks[index] = (height, block.hash);
counter += 1;
}
}
count_back -= 250;
}
}

i_have_blocks
}
}
Expand Down
131 changes: 128 additions & 3 deletions src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl P2Chain {
.get(usize::try_from(index?).expect("32 bit systems not supported"))
}

fn get_block_at_height(&self, height: u64, hash: &FixedHash) -> Option<&Arc<P2Block>> {
pub fn get_block_at_height(&self, height: u64, hash: &FixedHash) -> Option<&Arc<P2Block>> {
let level = self.level_at_height(height)?;
level.blocks.get(hash)
}
Expand Down Expand Up @@ -201,11 +201,11 @@ impl P2Chain {
self.levels.len() as u64 >= self.total_size + SAFETY_MARGIN + MAX_EXTRA_SYNC
}

fn cleanup_chain(&mut self) -> Result<(), ShareChainError>{
fn cleanup_chain(&mut self) -> Result<(), ShareChainError> {
let mut first_index = self.levels.back().map(|level| level.height).unwrap_or(0);
let mut current_chain_length = self.current_tip.saturating_sub(first_index);
// let see if we are the limit for the current chain
while current_chain_length > self.total_size + SAFETY_MARGIN{
while current_chain_length > self.total_size + SAFETY_MARGIN {
self.levels.pop_back().ok_or(ShareChainError::BlockLevelNotFound)?;
first_index = self.levels.back().map(|level| level.height).unwrap_or(0);
current_chain_length = self.current_tip.saturating_sub(first_index);
Expand Down Expand Up @@ -1368,6 +1368,131 @@ mod test {
chain.assert_share_window_verified();
}

#[test]
fn add_blocks_missing_block() {
// this test will verify that we reorg to a completely new chain
let mut chain = P2Chain::new_empty(50, 25, 20);

let mut prev_block = None;
let mut tari_block = Block::new(BlockHeader::new(0), AggregateBody::empty());
let mut blocks = Vec::new();
for i in 0..50 {
tari_block.header.nonce = i;
let address = new_random_address();
let block = P2BlockBuilder::new(prev_block.as_ref())
.with_timestamp(EpochTime::now())
.with_height(i)
.with_tari_block(tari_block.clone())
.unwrap()
.with_miner_wallet_address(address.clone())
.with_target_difficulty(Difficulty::from_u64(10).unwrap())
.unwrap()
.build()
.unwrap();
prev_block = Some(block.clone());
blocks.push(block);
}

for (i, block) in blocks.iter().enumerate().take(50) {
if i != 25 {
chain.add_block_to_chain(block.clone()).unwrap();
}
}
assert_eq!(chain.current_tip, 24);
chain.add_block_to_chain(blocks[25].clone()).unwrap();

assert_eq!(chain.current_tip, 49);
assert_eq!(chain.get_tip().unwrap().chain_block, prev_block.unwrap().hash);

chain.assert_share_window_verified();
}

#[test]
fn reorg_with_missing_uncle() {
// this test will verify that we reorg to a completely new chain
let mut chain = P2Chain::new_empty(50, 25, 20);

let mut prev_block = None;
let mut tari_block = Block::new(BlockHeader::new(0), AggregateBody::empty());
for i in 0..50 {
tari_block.header.nonce = i;
let address = new_random_address();
let block = P2BlockBuilder::new(prev_block.as_ref())
.with_timestamp(EpochTime::now())
.with_height(i)
.with_tari_block(tari_block.clone())
.unwrap()
.with_miner_wallet_address(address.clone())
.with_target_difficulty(Difficulty::from_u64(10).unwrap())
.unwrap()
.build()
.unwrap();
prev_block = Some(block.clone());
chain.add_block_to_chain(block).unwrap();
}

assert_eq!(chain.current_tip, 49);
assert_eq!(chain.get_tip().unwrap().chain_block, prev_block.unwrap().hash);

let mut prev_block = None;
let mut tari_block = Block::new(BlockHeader::new(0), AggregateBody::empty());
let mut uncle_parent = None;
let mut uncle_block = None;
for i in 0..50 {
tari_block.header.nonce = i + 100;
let address = new_random_address();
let uncles = if i == 25 {
let uncle = P2BlockBuilder::new(uncle_parent.as_ref())
.with_timestamp(EpochTime::now())
.with_height(24)
.with_tari_block(tari_block.clone())
.unwrap()
.with_miner_wallet_address(address.clone())
.build()
.unwrap();
uncle_block = Some(uncle.clone());
vec![uncle]
} else {
vec![]
};
let block = P2BlockBuilder::new(prev_block.as_ref())
.with_timestamp(EpochTime::now())
.with_height(i)
.with_tari_block(tari_block.clone())
.unwrap()
.with_miner_wallet_address(address.clone())
.with_uncles(&uncles)
.unwrap()
.with_target_difficulty(Difficulty::from_u64(11).unwrap())
.unwrap()
.build()
.unwrap();
if i == 23 {
uncle_parent = Some(block.clone());
}
prev_block = Some(block.clone());
chain.add_block_to_chain(block).unwrap();
}

assert_eq!(chain.current_tip, 49);
let hash = prev_block.unwrap().hash;
assert_ne!(chain.get_tip().unwrap().chain_block, hash);
chain.add_block_to_chain(uncle_block.unwrap()).unwrap();
assert_eq!(chain.get_tip().unwrap().chain_block, hash);
assert_eq!(
chain
.get_tip()
.unwrap()
.block_in_main_chain()
.unwrap()
.original_header
.nonce,
149
);

chain.assert_share_window_verified();
}

#[test]
fn add_blocks_to_chain_super_large_reorg_only_window() {
// this test will verify that we reorg to a completely new chain
Expand Down

0 comments on commit f202713

Please sign in to comment.