Skip to content

[PoC] include a delay in block broadcasting to mitigate late blocks #2120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions node/src/actors/chain_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,10 @@ fn log_sync_progress(
impl Handler<AddCandidates> for ChainManager {
type Result = SessionUnitResult;

fn handle(&mut self, msg: AddCandidates, _ctx: &mut Context<Self>) {
fn handle(&mut self, msg: AddCandidates, ctx: &mut Context<Self>) {
// AddCandidates is needed in all states
for block in msg.blocks {
self.process_candidate(block);
for (block, ts) in msg.blocks {
self.process_candidate(ctx, block, ts);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions node/src/actors/chain_manager/mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use witnet_data_structures::{
};
use witnet_futures_utils::TryFutureExt2;
use witnet_rad::{error::RadError, types::serial_iter_decode};
use witnet_util::timestamp::get_timestamp;
use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos};
use witnet_validations::validations::{
block_reward, calculate_liars_and_errors_count_from_tally, calculate_mining_probability,
calculate_randpoe_threshold, calculate_reppoe_threshold, dr_transaction_fee, merkle_tree_root,
Expand Down Expand Up @@ -250,7 +250,7 @@ impl ChainManager {
beacon,
epoch_constants,
)
.map_ok(|_diff, act, _ctx| {
.map_ok(|_diff, act, ctx| {
// Send AddCandidates message to self
// This will run all the validations again

Expand All @@ -263,7 +263,7 @@ impl ChainManager {
Yellow.bold().paint(block_hash.to_string())
);

act.process_candidate(block);
act.process_candidate(ctx, block, get_timestamp_nanos());
})
.map_err(|e, _, _| log::error!("Error trying to mine a block: {}", e))
})
Expand Down
81 changes: 73 additions & 8 deletions node/src/actors/chain_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use witnet_data_structures::{
};

use witnet_rad::types::RadonTypes;
use witnet_util::timestamp::seconds_to_human_string;
use witnet_util::timestamp::{duration_between_timestamps, seconds_to_human_string};
use witnet_validations::validations::{
compare_block_candidates, validate_block, validate_block_transactions,
validate_new_transaction, validate_rad_request, verify_signatures, VrfSlots,
Expand Down Expand Up @@ -515,7 +515,7 @@ impl ChainManager {
}

#[allow(clippy::map_entry)]
fn process_candidate(&mut self, block: Block) {
fn process_candidate(&mut self, ctx: &mut Context<Self>, block: Block, ts: (i64, u32)) {
if let (Some(current_epoch), Some(chain_info), Some(rep_engine), Some(vrf_ctx)) = (
self.current_epoch,
self.chain_state.chain_info.as_ref(),
Expand Down Expand Up @@ -551,6 +551,20 @@ impl ChainManager {
return;
}

// Calculate delay for broadcasting blocks
let delay = if let Some(delay) = calculate_delay_for_broadcasting_block(
chain_info.consensus_constants.checkpoint_zero_timestamp,
chain_info.consensus_constants.checkpoints_period,
current_epoch,
ts,
) {
delay
} else {
log::debug!("Block received too late to broadcasting");

return;
};

let mut vrf_input = chain_info.highest_vrf_output;
vrf_input.checkpoint = current_epoch;
let active_wips = ActiveWips {
Expand Down Expand Up @@ -582,7 +596,9 @@ impl ChainManager {
// In order to do not block possible validate candidates in AlmostSynced
// state, we would broadcast the errors too
if self.sm_state == StateMachine::AlmostSynced {
self.broadcast_item(InventoryItem::Block(block));
ctx.run_later(delay, |act, _ctx| {
act.broadcast_item(InventoryItem::Block(block))
});
}

return;
Expand Down Expand Up @@ -646,7 +662,9 @@ impl ChainManager {
vrf_proof,
});

self.broadcast_item(InventoryItem::Block(block));
ctx.run_later(delay, |act, _ctx| {
act.broadcast_item(InventoryItem::Block(block))
});
}
Err(e) => {
log::warn!(
Expand All @@ -658,7 +676,9 @@ impl ChainManager {
// In order to do not block possible validate candidates in AlmostSynced
// state, we would broadcast the errors too
if self.sm_state == StateMachine::AlmostSynced {
self.broadcast_item(InventoryItem::Block(block));
ctx.run_later(delay, |act, _ctx| {
act.broadcast_item(InventoryItem::Block(block))
});
}
}
}
Expand Down Expand Up @@ -2207,6 +2227,51 @@ impl ChainManager {
}
}

// Auxiliary function that converts one delay in another
fn delay_function(initial_delay: Duration) -> Duration {
// TODO: Apply a right delay function
// Remove 7.5 secs to the delay
initial_delay.saturating_sub(Duration::new(7, 500000000))
}

// Calculate the delay to introduce in block broadcasting
// Returns None in case of overflow the current epoch duration
#[allow(clippy::cast_sign_loss)]
fn calculate_delay_for_broadcasting_block(
checkpoint_zero_timestamp: i64,
checkpoints_period: u16,
current_epoch: Epoch,
ts: (i64, u32),
) -> Option<Duration> {
let epoch_constants = EpochConstants {
checkpoint_zero_timestamp,
checkpoints_period,
};

// Calculate delay between mining block timestamp and another timestamp
let timestamp_mining = epoch_constants
.block_mining_timestamp(current_epoch)
.unwrap();
let delay_from_mining_ts = duration_between_timestamps((timestamp_mining, 0), ts)
.unwrap_or_else(|| Duration::from_secs(0));

// Apply magic delay function
let delay_to_broadcasting = delay_function(delay_from_mining_ts);

// Return delay only if is before the end of the epoch
let end_epoch_ts = epoch_constants
.epoch_timestamp(current_epoch + 1)
.unwrap_or(i64::MAX) as u64;
let ts_with_delay = Duration::new(ts.0 as u64, ts.1).checked_add(delay_to_broadcasting);

match ts_with_delay {
Some(ts_with_delay) if ts_with_delay.as_secs() < end_epoch_ts => {
Some(delay_to_broadcasting)
}
_ => None,
}
}

/// Helper struct used to persist an old copy of the `ChainState` to the storage
#[derive(Debug, Default)]
struct ChainStateSnapshot {
Expand Down Expand Up @@ -3528,19 +3593,19 @@ mod tests {
assert_ne!(block_1, block_mal_1);

// Process the modified candidate first
chain_manager.process_candidate(block_mal_1);
chain_manager.process_candidate(&mut Context::new(), block_mal_1, (0, 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know it was possible to call Context::new()... But maybe it's better to send an AddCandidates message? This is inside an async block so we can just call send().await.

// The best candidate should be None because this block is invalid
let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block);
assert_eq!(best_cand, None);

// Process candidate with the same hash, but this one is valid
chain_manager.process_candidate(block_1.clone());
chain_manager.process_candidate(&mut Context::new(), block_1.clone(), (0, 0));
// The best candidate should be block_1
let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block);
assert_eq!(best_cand, Some(&block_1));

// Process another valid candidate, but worse than the other one
chain_manager.process_candidate(block_2);
chain_manager.process_candidate(&mut Context::new(), block_2, (0, 0));
// The best candidate should still be block_1
let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block);
assert_eq!(best_cand, Some(&block_1));
Expand Down
4 changes: 3 additions & 1 deletion node/src/actors/json_rpc/json_rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::actors::messages::GetSupplyInfo;
use futures::FutureExt;
use futures_util::compat::Compat;
use std::future::Future;
use witnet_util::timestamp::get_timestamp_nanos;

type JsonRpcResult = Result<Value, jsonrpc_core::Error>;

Expand Down Expand Up @@ -433,9 +434,10 @@ pub async fn inventory(params: Result<InventoryItem, jsonrpc_core::Error>) -> Js
log::debug!("Got block from JSON-RPC. Sending AnnounceItems message.");

let chain_manager_addr = ChainManager::from_registry();
let now = get_timestamp_nanos();
let res = chain_manager_addr
.send(AddCandidates {
blocks: vec![block],
blocks: vec![(block, now)],
})
.await;

Expand Down
2 changes: 1 addition & 1 deletion node/src/actors/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Message for AddBlocks {
/// Add a new candidate
pub struct AddCandidates {
/// Candidates
pub blocks: Vec<Block>,
pub blocks: Vec<(Block, (i64, u32))>,
}

impl Message for AddCandidates {
Expand Down
6 changes: 4 additions & 2 deletions node/src/actors/session/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::actors::{
sessions_manager::SessionsManager,
};

use witnet_util::timestamp::get_timestamp;
use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos};

#[derive(Debug, Eq, Fail, PartialEq)]
enum HandshakeError {
Expand Down Expand Up @@ -710,10 +710,12 @@ fn inventory_process_block(session: &mut Session, _ctx: &mut Context<Session>, b
// requested_block_hashes is cleared by using drain(..) above
}
} else {
let ts = get_timestamp_nanos();

// If this is not a requested block, assume it is a candidate
// Send a message to the ChainManager to try to add a new candidate
chain_manager_addr.do_send(AddCandidates {
blocks: vec![block],
blocks: vec![(block, ts)],
});
}
}
Expand Down