Skip to content

Commit

Permalink
refactor(state-sync): Request sync block in client actor before start…
Browse files Browse the repository at this point in the history
…ing state sync (#10175)

We only use this logic for state sync when the nodes starts with old
blocks.
If the nodes has been running during the epoch change, it is supposed to
have the block so it does not need it at state sync during catchup.
  • Loading branch information
VanBarbascu authored Dec 7, 2023
1 parent abf6e99 commit 27b518a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 98 deletions.
13 changes: 7 additions & 6 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::SyncAdapter;
use crate::SyncMessage;
use crate::{metrics, SyncStatus};
use actix_rt::ArbiterHandle;
use chrono::DateTime;
use chrono::Utc;
use itertools::Itertools;
use lru::LruCache;
use near_async::messaging::{CanSend, Sender};
Expand Down Expand Up @@ -178,6 +180,10 @@ pub struct Client {
tier1_accounts_cache: Option<(EpochId, Arc<AccountKeys>)>,
/// Used when it is needed to create flat storage in background for some shards.
flat_storage_creator: Option<FlatStorageCreator>,

/// When the "sync block" was requested.
/// The "sync block" is the last block of the previous epoch, i.e. `prev_hash` of the `sync_hash` block.
pub last_time_sync_block_requested: Option<DateTime<Utc>>,
}

impl Client {
Expand Down Expand Up @@ -360,6 +366,7 @@ impl Client {
chunk_production_info: lru::LruCache::new(PRODUCTION_TIMES_CACHE_SIZE),
tier1_accounts_cache: None,
flat_storage_creator,
last_time_sync_block_requested: None,
})
}

Expand Down Expand Up @@ -2326,12 +2333,6 @@ impl Client {
self.runtime_adapter.clone(),
)? {
StateSyncResult::InProgress => {}
StateSyncResult::RequestBlock => {
// here RequestBlock should not be returned, because the StateSyncInfos in
// self.chain.store().iterate_state_sync_infos() should have been stored by
// Chain::postprocess_block() on the block with hash sync_hash.
panic!("catchup state sync indicates sync block isn't on our chain")
}
StateSyncResult::Completed => {
debug!(target: "catchup", "state sync completed now catch up blocks");
self.chain.catchup_blocks_step(
Expand Down
108 changes: 74 additions & 34 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1640,25 +1640,17 @@ impl ClientActor {
notify_start_sync = true;
}
};
let state_sync_status = match &mut self.client.sync_status {
SyncStatus::StateSync(s) => s,
_ => unreachable!(),
let sync_hash = match &self.client.sync_status {
SyncStatus::StateSync(s) => s.sync_hash,
_ => unreachable!("Sync status should have been StateSync!"),
};

let me =
self.client.validator_signer.as_ref().map(|x| x.validator_id().clone());
let block_header = unwrap_and_report!(self
.client
.chain
.get_block_header(&state_sync_status.sync_hash));
let block_header =
unwrap_and_report!(self.client.chain.get_block_header(&sync_hash));
let prev_hash = *block_header.prev_hash();
let epoch_id = self
.client
.chain
.get_block_header(&state_sync_status.sync_hash)
.unwrap()
.epoch_id()
.clone();
let epoch_id = block_header.epoch_id().clone();
let shards_to_sync: Vec<_> = self
.client
.epoch_manager
Expand All @@ -1681,7 +1673,6 @@ impl ClientActor {

// Notify each shard to sync.
if notify_start_sync {
let sync_hash = state_sync_status.sync_hash;
let shard_layout = self
.client
.epoch_manager
Expand All @@ -1706,9 +1697,39 @@ impl ClientActor {
}
}

let now = StaticClock::utc();

// FIXME: it checks if the block exists.. but I have no idea why..
// seems that we don't really use this block in case of catchup - we use it only for state sync.
// Seems it is related to some bug with block getting orphaned after state sync? but not sure.
let (request_block, have_block) =
unwrap_and_report!(self.sync_block_status(&prev_hash, now));

if request_block {
self.client.last_time_sync_block_requested = Some(now);
if let Some(peer_info) =
self.network_info.highest_height_peers.choose(&mut thread_rng())
{
let id = peer_info.peer_info.id.clone();

for hash in
vec![*block_header.prev_hash(), *block_header.hash()].into_iter()
{
self.client.request_block(hash, id.clone());
}
}
}
if have_block {
self.client.last_time_sync_block_requested = None;
}

let state_sync_status = match &mut self.client.sync_status {
SyncStatus::StateSync(s) => s,
_ => unreachable!("Sync status should have been StateSync!"),
};
match unwrap_and_report!(self.client.state_sync.run(
&me,
state_sync_status.sync_hash,
sync_hash,
&mut state_sync_status.sync_status,
&mut self.client.chain,
self.client.epoch_manager.as_ref(),
Expand All @@ -1721,31 +1742,18 @@ impl ClientActor {
self.client.runtime_adapter.clone(),
)) {
StateSyncResult::InProgress => (),
StateSyncResult::RequestBlock => {
if let Some(peer_info) =
self.network_info.highest_height_peers.choose(&mut thread_rng())
{
let id = peer_info.peer_info.id.clone();

if let Ok(header) =
self.client.chain.get_block_header(&state_sync_status.sync_hash)
{
for hash in
vec![*header.prev_hash(), *header.hash()].into_iter()
{
self.client.request_block(hash, id.clone());
}
}
}
}
StateSyncResult::Completed => {
if !have_block {
trace!(target: "sync", "Sync done. Waiting for sync block.");
return;
}
info!(target: "sync", "State sync: all shards are done");

let mut block_processing_artifacts = BlockProcessingArtifact::default();

unwrap_and_report!(self.client.chain.reset_heads_post_state_sync(
&me,
state_sync_status.sync_hash,
sync_hash,
&mut block_processing_artifacts,
self.get_apply_chunks_done_callback(),
));
Expand All @@ -1764,6 +1772,38 @@ impl ClientActor {
}
}

/// Verifies if the node possesses sync block.
/// It is the last block of the previous epoch.
/// If the block is absent, the node requests it from peers.
fn sync_block_status(
&self,
prev_hash: &CryptoHash,
now: DateTime<Utc>,
) -> Result<(bool, bool), near_chain::Error> {
let (request_block, have_block) = if !self.client.chain.block_exists(prev_hash)? {
let timeout =
chrono::Duration::from_std(self.client.config.state_sync_timeout).unwrap();
match self.client.last_time_sync_block_requested {
None => (true, false),
Some(last_time) => {
if (now - last_time) >= timeout {
tracing::error!(
target: "sync",
%prev_hash,
?timeout,
"State sync: block request timed out");
(true, false)
} else {
(false, false)
}
}
}
} else {
(false, true)
};
Ok((request_block, have_block))
}

/// Print current summary.
fn log_summary(&mut self) {
let _span = tracing::debug_span!(target: "client", "log_summary").entered();
Expand Down
63 changes: 5 additions & 58 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300;
pub enum StateSyncResult {
/// State sync still in progress. No action needed by the caller.
InProgress,
/// The client needs to start fetching the block
RequestBlock,
/// The state for all shards was downloaded.
Completed,
}
Expand Down Expand Up @@ -134,10 +132,6 @@ pub struct StateSync {
/// Is used for communication with the peers.
network_adapter: PeerManagerAdapter,

/// When the "sync block" was requested.
/// The "sync block" is the last block of the previous epoch, i.e. `prev_hash` of the `sync_hash` block.
last_time_block_requested: Option<DateTime<Utc>>,

/// Timeout (set in config - by default to 60 seconds) is used to figure out how long we should wait
/// for the answer from the other node before giving up.
timeout: Duration,
Expand Down Expand Up @@ -205,7 +199,6 @@ impl StateSync {
StateSync {
inner,
network_adapter,
last_time_block_requested: None,
timeout,
state_parts_apply_results: HashMap::new(),
split_state_roots: HashMap::new(),
Expand All @@ -214,38 +207,6 @@ impl StateSync {
}
}

fn sync_block_status(
&mut self,
prev_hash: &CryptoHash,
chain: &Chain,
now: DateTime<Utc>,
) -> Result<(bool, bool), near_chain::Error> {
let (request_block, have_block) = if !chain.block_exists(prev_hash)? {
match self.last_time_block_requested {
None => (true, false),
Some(last_time) => {
if now - last_time >= self.timeout {
tracing::error!(
target: "sync",
%prev_hash,
timeout_sec = self.timeout.num_seconds(),
"State sync: block request timed out");
(true, false)
} else {
(false, false)
}
}
}
} else {
self.last_time_block_requested = None;
(false, true)
};
if request_block {
self.last_time_block_requested = Some(now);
};
Ok((request_block, have_block))
}

// The return value indicates whether state sync is
// finished, in which case the client will transition to block sync
fn sync_shards_status(
Expand Down Expand Up @@ -711,27 +672,13 @@ impl StateSync {
) -> Result<StateSyncResult, near_chain::Error> {
let _span = tracing::debug_span!(target: "sync", "run", sync = "StateSync").entered();
tracing::trace!(target: "sync", %sync_hash, ?tracking_shards, "syncing state");
let prev_hash = *chain.get_block_header(&sync_hash)?.prev_hash();
let now = StaticClock::utc();

// FIXME: it checks if the block exists.. but I have no idea why..
// seems that we don't really use this block in case of catchup - we use it only for state sync.
// Seems it is related to some bug with block getting orphaned after state sync? but not sure.
let (request_block, have_block) = self.sync_block_status(&prev_hash, chain, now)?;

if tracking_shards.is_empty() {
// This case is possible if a validator cares about the same shards in the new epoch as
// in the previous (or about a subset of them), return success right away

return if !have_block {
if request_block {
Ok(StateSyncResult::RequestBlock)
} else {
Ok(StateSyncResult::InProgress)
}
} else {
Ok(StateSyncResult::Completed)
};
return Ok(StateSyncResult::Completed);
}
// The downloaded parts are from all shards. This function takes all downloaded parts and
// saves them to the DB.
Expand All @@ -753,11 +700,11 @@ impl StateSync {
runtime_adapter,
)?;

if have_block && all_done {
return Ok(StateSyncResult::Completed);
if all_done {
Ok(StateSyncResult::Completed)
} else {
Ok(StateSyncResult::InProgress)
}

Ok(if request_block { StateSyncResult::RequestBlock } else { StateSyncResult::InProgress })
}

pub fn update_download_on_state_response_message(
Expand Down

0 comments on commit 27b518a

Please sign in to comment.