Skip to content

Commit

Permalink
fix: better semaphore use to ensure syncing from only one peer (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Dec 5, 2024
1 parent c3d0651 commit 02a8bfb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
48 changes: 30 additions & 18 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub const STABLE_PRIVATE_KEY_FILE: &str = "p2pool_private.key";
const MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT: Duration = Duration::from_millis(500);
const MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT: Duration = Duration::from_millis(100);
const CATCH_UP_SYNC_BLOCKS_IN_I_HAVE: usize = 100;
const MAX_CATCH_UP_ATTEMPTS: u64 = 150;
const MAX_CATCH_UP_ATTEMPTS: u64 = 500;
// Time to start up and catch up before we start processing new tip messages
const NUM_PEERS_TO_SYNC_PER_ALGO: usize = 32;
const NUM_PEERS_INITIAL_SYNC: usize = 100;
Expand Down Expand Up @@ -209,6 +209,7 @@ struct PerformCatchUpSync {
pub peer: PeerId,
pub last_block_from_them: Option<(u64, FixedHash)>,
pub their_height: u64,
pub permit: Option<OwnedSemaphorePermit>,
}

#[derive(NetworkBehaviour)]
Expand Down Expand Up @@ -886,25 +887,27 @@ where S: ShareChain

let our_tip_sha3x = self.share_chain_sha3x.chain_pow().await;

if response.info.current_sha3x_pow > our_tip_sha3x.as_u128() {
if self.config.sha3x_enabled && response.info.current_sha3x_pow > our_tip_sha3x.as_u128() {
let perform_catch_up_sync = PerformCatchUpSync {
algo: PowAlgorithm::Sha3x,
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_sha3x_height,
permit: None,
};
let _unused = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
}
let our_tip_rx = self.share_chain_random_x.chain_pow().await;

if response.info.current_random_x_pow > our_tip_rx.as_u128() {
if self.config.randomx_enabled && response.info.current_random_x_pow > our_tip_rx.as_u128() {
let perform_catch_up_sync = PerformCatchUpSync {
algo: PowAlgorithm::RandomX,
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_random_x_height,
permit: None,
};
let _unused = self
.inner_request_tx
Expand Down Expand Up @@ -1309,15 +1312,15 @@ where S: ShareChain
self.handle_catch_up_sync_request(channel, request).await;
},
request_response::Message::Response { request_id, response } => {
let permit = self.release_catchup_sync_permit(request_id);
match response {
Ok(response) => {
self.handle_catch_up_sync_response(response).await;
self.handle_catch_up_sync_response(response, permit).await;
},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "REQ-RES catch up sync response error: {error:?}");
},
}
self.release_catchup_sync_permit(request_id);
},
},
request_response::Event::OutboundFailure {
Expand Down Expand Up @@ -1477,16 +1480,15 @@ where S: ShareChain
}
}

fn release_catchup_sync_permit(&mut self, request_id: OutboundRequestId) {
fn release_catchup_sync_permit(&mut self, request_id: OutboundRequestId) -> Option<OwnedSemaphorePermit> {
let should_remove: Vec<PeerId> = self
.randomx_in_progress_syncs
.iter()
.filter_map(|(peer, r)| if r.0 == request_id { Some(*peer) } else { None })
.collect();
for peer in should_remove {
if let Some((_r, permit)) = self.randomx_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
return Some(permit);
}
}

Expand All @@ -1497,13 +1499,17 @@ where S: ShareChain
.collect();
for peer in should_remove {
if let Some((_r, permit)) = self.sha3x_in_progress_syncs.remove(&peer) {
// Probably don't need to do this
drop(permit);
return Some(permit);
}
}
None
}

async fn handle_catch_up_sync_response(&mut self, response: CatchUpSyncResponse) {
async fn handle_catch_up_sync_response(
&mut self,
response: CatchUpSyncResponse,
permit: Option<OwnedSemaphorePermit>,
) {
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "Catch up sync response: {response:?}");
if response.version != PROTOCOL_VERSION {
trace!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", response.peer_id());
Expand Down Expand Up @@ -1584,6 +1590,7 @@ where S: ShareChain
peer,
last_block_from_them,
their_height,
permit,
};
let _unused = tx.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
} else {
Expand Down Expand Up @@ -1658,6 +1665,7 @@ where S: ShareChain
peer,
last_block_from_them,
their_height,
mut permit,
} = perform_catch_up_sync;

// First check if we have a sync in progress for this peer.
Expand Down Expand Up @@ -1685,13 +1693,17 @@ where S: ShareChain
// return Ok(());
// }

let permit = match semaphore.try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Could not acquire semaphore for catch up sync");
return Ok(());
},
};
if permit.is_none() {
permit = match semaphore.try_acquire_owned() {
Ok(permit) => Some(permit),
Err(_) => {
warn!(target: SYNC_REQUEST_LOG_TARGET, "Could not acquire semaphore for catch up sync for peer: {}", peer);
return Ok(());
},
};
}

let permit = permit.unwrap();

// Only allow follow on catch up syncs if we've tried to sync from them recently
// if last_block_from_them.is_none() &&
Expand Down
4 changes: 2 additions & 2 deletions src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ impl P2Chain {
self.levels.len() as u64 >= self.total_size + SAFETY_MARGIN + MAX_EXTRA_SYNC
}

fn cleanup_chain(&mut self) -> Result<(), ShareChainError>{
fn cleanup_chain(&mut self) -> Result<(), ShareChainError> {
let mut first_index = self.levels.back().map(|level| level.height).unwrap_or(0);
let mut current_chain_length = self.current_tip.saturating_sub(first_index);
// let see if we are the limit for the current chain
while current_chain_length > self.total_size + SAFETY_MARGIN{
while current_chain_length > self.total_size + SAFETY_MARGIN {
self.levels.pop_back().ok_or(ShareChainError::BlockLevelNotFound)?;
first_index = self.levels.back().map(|level| level.height).unwrap_or(0);
current_chain_length = self.current_tip.saturating_sub(first_index);
Expand Down

0 comments on commit 02a8bfb

Please sign in to comment.