Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Nov 27, 2024
1 parent eaa0306 commit 19e9764
Showing 1 changed file with 35 additions and 24 deletions.
59 changes: 35 additions & 24 deletions base_layer/core/src/base_node/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{str::FromStr, time::Duration};
use std::{cmp::max, str::FromStr, time::Duration};

use futures::future;
use hickory_client::{
Expand All @@ -33,13 +33,13 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info, warn};
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time};
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time, time::MissedTickBehavior};

use super::LocalNodeCommsInterface;
use crate::base_node::comms_interface::CommsInterfaceError;
Expand Down Expand Up @@ -121,31 +121,37 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::pin!(interval);
let mut shutdown_signal = self.shutdown_signal.clone();
let mut count = 0u64;
let mut skip_ticks = 0;
let mut skipped_ticks = 0;

loop {
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
interval = time::interval(self.config.check_interval); // reset interval if back to healthy
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let old_interval = interval.period().as_secs();
let new_interval = if old_interval > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes.");
old_interval
} else {
// increase interval if node repeatedly (up to 30 min) fails to fetch checkpoints
interval = time::interval(Duration::from_secs(old_interval * 2));
interval.tick().await;
interval.period().as_secs()
};
warn!(target: LOG_TARGET, "Retrying in {} seconds", new_interval);
continue;
},
count += 1;
trace!(target: LOG_TARGET, "Interval tick: {}", count);
if skipped_ticks < skip_ticks {
skipped_ticks += 1;
debug!(target: LOG_TARGET, "Sipping {} of {} ticks", skipped_ticks, skip_ticks);
continue;
}
let passed_checkpoints = {
match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
skip_ticks = 0;
skipped_ticks = 0;
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
skip_ticks = max(skip_ticks + 1, 30 * 60 / self.config.check_interval.as_secs());
skipped_ticks = 0;
continue;
},
}
};

notify_passed_checkpoints
Expand Down Expand Up @@ -174,7 +180,12 @@ impl TariPulseService {
.max_by(|a, b| a.0.cmp(&b.0))
.ok_or(CommsInterfaceError::InternalError("No checkpoints found".to_string()))?;
let local_checkpoints = self.get_node_block(base_node_service, max_height_block.0).await?;
Ok(local_checkpoints.1 == max_height_block.1)
let passed = local_checkpoints.1 == max_height_block.1;
trace!(
target: LOG_TARGET, "Passed checkpoints: {}, DNS: ({}, {}), Local: ({}, {})",
passed, max_height_block.0, max_height_block.1, local_checkpoints.0, local_checkpoints.1
);
Ok(passed)
}

async fn get_node_block(
Expand Down

0 comments on commit 19e9764

Please sign in to comment.