Skip to content

Commit

Permalink
Await shutdown signal in main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
MCozhusheck committed Nov 20, 2024
1 parent 0610bcc commit a3a124a
Showing 1 changed file with 42 additions and 17 deletions.
59 changes: 42 additions & 17 deletions base_layer/core/src/base_node/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
Expand Down Expand Up @@ -75,13 +75,18 @@ fn get_network_dns_name(network: Network) -> Name {
pub struct TariPulseService {
dns_name: Name,
config: TariPulseConfig,
shutdown_signal: ShutdownSignal,
}

impl TariPulseService {
pub async fn new(config: TariPulseConfig) -> Result<Self, anyhow::Error> {
pub async fn new(config: TariPulseConfig, shutdown_signal: ShutdownSignal) -> Result<Self, anyhow::Error> {
let dns_name: Name = get_network_dns_name(config.clone().network);
info!(target: LOG_TARGET, "Tari Pulse Service initialized with DNS name: {}", dns_name);
Ok(Self { dns_name, config })
Ok(Self {
dns_name,
config,
shutdown_signal,
})
}

pub fn default_trust_anchor() -> TrustAnchor {
Expand Down Expand Up @@ -116,21 +121,41 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
let mut interval_failed = time::interval(Duration::from_millis(100));
let mut interval_failed = time::interval(Duration::from_secs(60));
let mut shutdown_signal = self.shutdown_signal.clone();

loop {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => passed,
Err(err) => {
error!(target: LOG_TARGET, "Error checking if node passed checkpoints: {:?}", err);
interval_failed.tick().await;
continue;
},
};
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => passed,
Err(err) => {
error!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let interval_in_secs = interval_failed.period().as_secs();
if interval_in_secs > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes. Exiting");
break;
}
error!(target: LOG_TARGET, "Retrying in {} minutes", interval_in_secs/60);
interval_failed.tick().await;
interval_failed.tick().await;
interval_failed = time::interval(Duration::from_secs(interval_in_secs * 2));
continue;
},
};

notify_passed_checkpoints
.send(!passed_checkpoints)
.expect("Channel should be open");
interval.tick().await;
notify_passed_checkpoints
.send(!passed_checkpoints)
.expect("Channel should be open");
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
"Tari Pulse shutting down because the shutdown signal was received"
);
break;
},
}
}
}

Expand Down Expand Up @@ -231,7 +256,7 @@ impl ServiceInitializer for TariPulseServiceInitializer {

context.spawn_when_ready(move |handles| async move {
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let mut tari_pulse_service = TariPulseService::new(config)
let mut tari_pulse_service = TariPulseService::new(config, shutdown_signal.clone())
.await
.expect("Should be able to get the service");
let tari_pulse_service = tari_pulse_service.run(base_node_service, sender);
Expand Down

0 comments on commit a3a124a

Please sign in to comment.