From 0610bcceb5debb91484191f6178e2591db123263 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Ko=C5=BCuszek?= Date: Tue, 19 Nov 2024 12:00:13 +0100 Subject: [PATCH] feat: validate blockchain fetch from DNS server with local state (#6658) Description --- Add new service which will periodically get checkpoints from a DNS server and validate with it's own state. If we fail this check we will include this information in `get_new_block_template` request so the sender is aware that this node is out of sync. Motivation and Context --- This will allow miners including one from Tari Universe to avoid mining on the orphan chain. This implementation is modeled after the [Monero Pulse](https://docs.getmonero.org/infrastructure/monero-pulse/#moneropulse) How Has This Been Tested? --- Run app on the nextnet and look for logs with `tari_pulse` in the `base_layer.log` file like this: `tail -f base_layer.log | grep tari_pulse ` Make `get_new_block_template` and verify if new fields are present. Create orphan chain (eg. by turning off tor and banning all TCP peers) and check (with above methods) if nodes detects orphan chain. What process can a PR reviewer use to test or verify this change? --- Same as above Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify --------- Co-authored-by: SW van Heerden --- .license.ignore | 2 + Cargo.lock | 3 + .../minotari_app_grpc/proto/base_node.proto | 1 + applications/minotari_node/src/bootstrap.rs | 11 +- applications/minotari_node/src/builder.rs | 11 +- applications/minotari_node/src/config.rs | 3 + .../src/grpc/base_node_grpc_server.rs | 5 + base_layer/core/Cargo.toml | 2 + base_layer/core/src/base_node/mod.rs | 3 + .../base_node/tari_pulse_service/20326.rsa | 1 + .../base_node/tari_pulse_service/38696.rsa | 1 + .../src/base_node/tari_pulse_service/mod.rs | 245 ++++++++++++++++++ common/config/presets/c_base_node_c.toml | 3 + common/src/exit_codes.rs | 2 + 14 files changed, 288 insertions(+), 5 deletions(-) create mode 100644 base_layer/core/src/base_node/tari_pulse_service/20326.rsa create mode 100644 base_layer/core/src/base_node/tari_pulse_service/38696.rsa create mode 100644 base_layer/core/src/base_node/tari_pulse_service/mod.rs diff --git a/.license.ignore b/.license.ignore index 9af3bb095f..5df15eb5ef 100644 --- a/.license.ignore +++ b/.license.ignore @@ -2,6 +2,8 @@ ./applications/minotari_node/assets/tari_logo.rs ./applications/minotari_node/osx-pkg/entitlements.xml ./base_layer/contacts/src/schema.rs +./base_layer/core/src/base_node/tari_pulse_service/20326.rsa +./base_layer/core/src/base_node/tari_pulse_service/38696.rsa ./base_layer/key_manager/src/schema.rs ./base_layer/wallet/src/schema.rs ./docs/src/theme/book.js diff --git a/Cargo.lock b/Cargo.lock index 9e58b5e9a6..5779a4aac1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2573,6 +2573,7 @@ dependencies = [ "idna 0.5.0", "ipnet", "once_cell", + "openssl", "rand", "ring", "rustls", @@ -6629,6 +6630,7 @@ dependencies = [ name = "tari_core" version = "1.9.0-pre.0" dependencies = [ + "anyhow", "async-trait", "bincode", "bitflags 2.4.1", @@ -6647,6 +6649,7 @@ dependencies = [ "fs2", "futures 0.3.29", "hex", + "hickory-client", "integer-encoding", "libsqlite3-sys", "lmdb-zero", diff --git a/applications/minotari_app_grpc/proto/base_node.proto b/applications/minotari_app_grpc/proto/base_node.proto index 3e75dad936..9bceb0fd0c 100644 --- a/applications/minotari_app_grpc/proto/base_node.proto +++ b/applications/minotari_app_grpc/proto/base_node.proto @@ -160,6 +160,7 @@ message TipInfoResponse { MetaData metadata = 1; bool initial_sync_achieved = 2; BaseNodeState base_node_state = 3; + bool failed_checkpoints = 4; } enum BaseNodeState{ diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index 6620e7bd07..7c680a11f0 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -38,18 +38,18 @@ use tari_comms::{ }; use tari_comms_dht::Dht; use tari_core::{ - base_node, base_node::{ + self, chain_metadata_service::ChainMetadataServiceInitializer, service::BaseNodeServiceInitializer, state_machine_service::initializer::BaseNodeStateMachineInitializer, + tari_pulse_service::TariPulseServiceInitializer, LocalNodeCommsInterface, StateMachineHandle, }, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, BlockchainDatabase}, consensus::ConsensusManager, - mempool, - mempool::{service::MempoolHandle, Mempool, MempoolServiceInitializer, MempoolSyncInitializer}, + mempool::{self, service::MempoolHandle, Mempool, MempoolServiceInitializer, MempoolSyncInitializer}, proof_of_work::randomx_factory::RandomXFactory, transactions::CryptoFactories, }; @@ -170,6 +170,10 @@ where B: BlockchainBackend + 'static self.randomx_factory, self.app_config.base_node.bypass_range_proof_verification, )) + .add_initializer(TariPulseServiceInitializer::new( + base_node_config.tari_pulse_interval, + base_node_config.network, + )) .build() .await?; @@ -221,7 +225,6 @@ where B: BlockchainBackend + 'static }; handles.register(comms); - Ok(handles) } diff --git a/applications/minotari_node/src/builder.rs b/applications/minotari_node/src/builder.rs index 171372b04d..6b2792e2e7 100644 --- a/applications/minotari_node/src/builder.rs +++ b/applications/minotari_node/src/builder.rs @@ -30,7 +30,12 @@ use tari_common::{ use tari_comms::{peer_manager::NodeIdentity, protocol::rpc::RpcServerHandle, CommsNode}; use tari_comms_dht::Dht; use tari_core::{ - base_node::{state_machine_service::states::StatusInfo, LocalNodeCommsInterface, StateMachineHandle}, + base_node::{ + state_machine_service::states::StatusInfo, + tari_pulse_service::TariPulseHandle, + LocalNodeCommsInterface, + StateMachineHandle, + }, chain_storage::{create_lmdb_database, BlockchainDatabase, ChainStorageError, LMDBDatabase, Validators}, consensus::ConsensusManager, mempool::{service::LocalMempoolService, Mempool}, @@ -121,6 +126,10 @@ impl BaseNodeContext { self.base_node_handles.expect_handle() } + pub fn tari_pulse(&self) -> TariPulseHandle { + self.base_node_handles.expect_handle() + } + /// Returns a handle to the comms RPC server pub fn rpc_server(&self) -> RpcServerHandle { self.base_node_handles.expect_handle() diff --git a/applications/minotari_node/src/config.rs b/applications/minotari_node/src/config.rs index 01de596230..f6b704801b 100644 --- a/applications/minotari_node/src/config.rs +++ b/applications/minotari_node/src/config.rs @@ -142,6 +142,8 @@ pub struct BaseNodeConfig { pub state_machine: BaseNodeStateMachineConfig, /// Obscure GRPC error responses pub report_grpc_error: bool, + // Interval to check if the base node is still in sync with the network + pub tari_pulse_interval: Duration, } impl Default for BaseNodeConfig { @@ -180,6 +182,7 @@ impl Default for BaseNodeConfig { metadata_auto_ping_interval: Duration::from_secs(30), state_machine: Default::default(), report_grpc_error: false, + tari_pulse_interval: Duration::from_secs(120), } } } diff --git a/applications/minotari_node/src/grpc/base_node_grpc_server.rs b/applications/minotari_node/src/grpc/base_node_grpc_server.rs index ce991bad8e..1b5f2c548d 100644 --- a/applications/minotari_node/src/grpc/base_node_grpc_server.rs +++ b/applications/minotari_node/src/grpc/base_node_grpc_server.rs @@ -45,6 +45,7 @@ use tari_core::{ base_node::{ comms_interface::CommsInterfaceError, state_machine_service::states::StateInfo, + tari_pulse_service::TariPulseHandle, LocalNodeCommsInterface, StateMachineHandle, }, @@ -114,6 +115,7 @@ pub struct BaseNodeGrpcServer { comms: CommsNode, liveness: LivenessHandle, report_grpc_error: bool, + tari_pulse: TariPulseHandle, config: BaseNodeConfig, } @@ -129,6 +131,7 @@ impl BaseNodeGrpcServer { comms: ctx.base_node_comms().clone(), liveness: ctx.liveness(), report_grpc_error: ctx.get_report_grpc_error(), + tari_pulse: ctx.tari_pulse(), config, } } @@ -1637,6 +1640,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { trace!(target: LOG_TARGET, "Incoming GRPC request for BN tip data"); let mut handler = self.node_service.clone(); + let failed_checkpoints = *self.tari_pulse.get_failed_checkpoints_notifier(); let meta = handler .get_metadata() @@ -1650,6 +1654,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { metadata: Some(meta.into()), initial_sync_achieved: status_watch.borrow().bootstrapped, base_node_state: state.into(), + failed_checkpoints, }; trace!(target: LOG_TARGET, "Sending MetaData response to client"); diff --git a/base_layer/core/Cargo.toml b/base_layer/core/Cargo.toml index ab77e26c36..24e1393311 100644 --- a/base_layer/core/Cargo.toml +++ b/base_layer/core/Cargo.toml @@ -93,6 +93,8 @@ tiny-keccak = { package = "tari-tiny-keccak", version = "2.0.2", features = [ "keccak", ] } dirs-next = "1.0.2" +hickory-client = { version = "0.25.0-alpha.2", features = ["dns-over-rustls", "dnssec-openssl"] } +anyhow = "1.0.53" [dev-dependencies] criterion = { version = "0.4.0" } diff --git a/base_layer/core/src/base_node/mod.rs b/base_layer/core/src/base_node/mod.rs index c7a37b6524..776fc59925 100644 --- a/base_layer/core/src/base_node/mod.rs +++ b/base_layer/core/src/base_node/mod.rs @@ -65,3 +65,6 @@ pub mod proto; #[cfg(any(feature = "base_node", feature = "base_node_proto"))] pub mod rpc; + +#[cfg(feature = "base_node")] +pub mod tari_pulse_service; diff --git a/base_layer/core/src/base_node/tari_pulse_service/20326.rsa b/base_layer/core/src/base_node/tari_pulse_service/20326.rsa new file mode 100644 index 0000000000..acd48e8bd6 --- /dev/null +++ b/base_layer/core/src/base_node/tari_pulse_service/20326.rsa @@ -0,0 +1 @@ +AwEAAaz/tAm8yTn4Mfeh5eyI96WSVexTBAvkMgJzkKTOiW1vkIbzxeF3+/4RgWOq7HrxRixHlFlExOLAJr5emLvN7SWXgnLh4+B5xQlNVz8Og8kvArMtNROxVQuCaSnIDdD5LKyWbRd2n9WGe2R8PzgCmr3EgVLrjyBxWezF0jLHwVN8efS3rCj/EWgvIWgb9tarpVUDK/b58Da+sqqls3eNbuv7pr+eoZG+SrDK6nWeL3c6H5Apxz7LjVc1uTIdsIXxuOLYA4/ilBmSVIzuDWfdRUfhHdY6+cn8HFRm+2hM8AnXGXws9555KrUB5qihylGa8subX2Nn6UwNR1AkUTV74bU= \ No newline at end of file diff --git a/base_layer/core/src/base_node/tari_pulse_service/38696.rsa b/base_layer/core/src/base_node/tari_pulse_service/38696.rsa new file mode 100644 index 0000000000..7948d28a47 --- /dev/null +++ b/base_layer/core/src/base_node/tari_pulse_service/38696.rsa @@ -0,0 +1 @@ +AwEAAa96jeuknZlaeSrvyAJj6ZHv28hhOKkx3rLGXVaC6rXTsDc449/cidltpkyGwCJNnOAlFNKF2jBosZBU5eeHspaQWOmOElZsjICMQMC3aeHbGiShvZsx4wMYSjH8e7Vrhbu6irwCzVBApESjbUdpWWmEnhathWu1jo+siFUiRAAxm9qyJNg/wOZqqzL/dL/q8PkcRU5oUKEpUge71M3ej2/7CPqpdVwuMoTvoB+ZOT4YeGyxMvHmbrxlFzGOHOijtzN+u1TQNatX2XBuzZNQ1K+s2CXkPIZo7s6JgZyvaBevYtxPvYLw4z9mR7K2vaF18UYH9Z9GNUUeayffKC73PYc= \ No newline at end of file diff --git a/base_layer/core/src/base_node/tari_pulse_service/mod.rs b/base_layer/core/src/base_node/tari_pulse_service/mod.rs new file mode 100644 index 0000000000..20de1f4339 --- /dev/null +++ b/base_layer/core/src/base_node/tari_pulse_service/mod.rs @@ -0,0 +1,245 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{str::FromStr, time::Duration}; + +use futures::future; +use hickory_client::{ + client::{AsyncDnssecClient, ClientHandle}, + proto::{ + iocompat::AsyncIoTokioAsStd, + rr::dnssec::{public_key::Rsa, SigSigner, TrustAnchor}, + xfer::DnsMultiplexer, + }, + rr::{DNSClass, Name, RData, Record, RecordType}, + tcp::TcpClientStream, +}; +use log::{error, info}; +use serde::{Deserialize, Serialize}; +use tari_p2p::Network; +use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext}; +use tari_shutdown::ShutdownSignal; +use tari_utilities::hex::Hex; +use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time}; + +use super::LocalNodeCommsInterface; +use crate::base_node::comms_interface::CommsInterfaceError; + +const LOG_TARGET: &str = "c::bn::tari_pulse"; +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct TariPulseConfig { + pub check_interval: Duration, + pub network: Network, +} + +impl Default for TariPulseConfig { + fn default() -> Self { + Self { + check_interval: Duration::from_secs(120), + network: Network::default(), + } + } +} + +fn get_network_dns_name(network: Network) -> Name { + match network { + Network::NextNet => Name::from_str("checkpoints-nextnet.tari.com").expect("infallible"), + Network::MainNet => Name::from_str("checkpoints-mainnet.tari.com").expect("infallible"), + Network::Esmeralda => Name::from_str("checkpoints-esmeralda.tari.com").expect("infallible"), + Network::StageNet => Name::from_str("checkpoints-stagenet.tari.com").expect("infallible"), + Network::Igor => Name::from_str("checkpoints-igor.tari.com").expect("infallible"), + Network::LocalNet => Name::from_str("checkpoints-localnet.tari.com").expect("infallible"), + } +} + +pub struct TariPulseService { + dns_name: Name, + config: TariPulseConfig, +} + +impl TariPulseService { + pub async fn new(config: TariPulseConfig) -> Result { + let dns_name: Name = get_network_dns_name(config.clone().network); + info!(target: LOG_TARGET, "Tari Pulse Service initialized with DNS name: {}", dns_name); + Ok(Self { dns_name, config }) + } + + pub fn default_trust_anchor() -> TrustAnchor { + const ROOT_ANCHOR_ORIG: &[u8] = include_bytes!("20326.rsa"); + const ROOT_ANCHOR_CURRENT: &[u8] = include_bytes!("38696.rsa"); + + let mut anchor = TrustAnchor::new(); + anchor.insert_trust_anchor(&Rsa::from_public_bytes(ROOT_ANCHOR_ORIG).expect("Invalid ROOT_ANCHOR_ORIG")); + anchor.insert_trust_anchor(&Rsa::from_public_bytes(ROOT_ANCHOR_CURRENT).expect("Invalid ROOT_ANCHOR_CURRENT")); + anchor + } + + async fn get_dns_client(&self) -> Result { + let timeout: Duration = Duration::from_secs(5); + let trust_anchor = Self::default_trust_anchor(); + + let (stream, handle) = TcpClientStream::>::new(([1, 1, 1, 1], 53).into()); + let dns_muxer = DnsMultiplexer::<_, SigSigner>::with_timeout(stream, handle, timeout, None); + let (client, bg) = AsyncDnssecClient::builder(dns_muxer) + .trust_anchor(trust_anchor) + .build() + .await?; + + tokio::spawn(bg); + + Ok(client) + } + + pub async fn run( + &mut self, + mut base_node_service: LocalNodeCommsInterface, + notify_passed_checkpoints: watch::Sender, + ) { + let mut interval = time::interval(self.config.check_interval); + let mut interval_failed = time::interval(Duration::from_millis(100)); + loop { + let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await { + Ok(passed) => passed, + Err(err) => { + error!(target: LOG_TARGET, "Error checking if node passed checkpoints: {:?}", err); + interval_failed.tick().await; + continue; + }, + }; + + notify_passed_checkpoints + .send(!passed_checkpoints) + .expect("Channel should be open"); + interval.tick().await; + } + } + + async fn passed_checkpoints( + &mut self, + base_node_service: &mut LocalNodeCommsInterface, + ) -> Result { + let dns_checkpoints = self.fetch_checkpoints().await?; + + let max_height_block = dns_checkpoints + .iter() + .max_by(|a, b| a.0.cmp(&b.0)) + .ok_or(CommsInterfaceError::InternalError("No checkpoints found".to_string()))?; + let local_checkpoints = self.get_node_block(base_node_service, max_height_block.0).await?; + Ok(local_checkpoints.1 == max_height_block.1) + } + + async fn get_node_block( + &mut self, + base_node_service: &mut LocalNodeCommsInterface, + block_height: u64, + ) -> Result<(u64, String), anyhow::Error> { + let historical_block = base_node_service + .get_header(block_height) + .await + .and_then(|header| match header { + Some(header) => Ok((header.height(), header.hash().to_hex())), + None => { + error!(target: LOG_TARGET, "Header not found for height: {}", block_height); + Err(CommsInterfaceError::InternalError("Header not found".to_string())) + }, + })?; + + Ok(historical_block) + } + + async fn fetch_checkpoints(&mut self) -> Result, anyhow::Error> { + let mut client = self.get_dns_client().await?; + let query = client.query(self.dns_name.clone(), DNSClass::IN, RecordType::TXT); + let response = query.await?; + let answers: &[Record] = response.answers(); + let checkpoints: Vec<(u64, String)> = answers + .iter() + .filter_map(|record| { + if let RData::TXT(txt) = record.data() { + let ascii_txt = txt.txt_data().iter().fold(String::new(), |mut acc, bytes| { + acc.push_str(&String::from_utf8_lossy(bytes)); + acc + }); + let (height, hash) = ascii_txt.split_once(':')?; + return Some((height.parse().ok()?, hash.to_string())); + } + None + }) + .collect(); + + Ok(checkpoints) + } +} + +#[derive(Clone)] +pub struct TariPulseHandle { + pub shutdown_signal: ShutdownSignal, + pub failed_checkpoints_notifier: watch::Receiver, +} + +impl TariPulseHandle { + pub fn get_failed_checkpoints_notifier(&self) -> watch::Ref<'_, bool> { + self.failed_checkpoints_notifier.borrow() + } +} + +pub struct TariPulseServiceInitializer { + interval: Duration, + network: Network, +} + +impl TariPulseServiceInitializer { + pub fn new(interval: Duration, network: Network) -> Self { + Self { interval, network } + } +} + +#[async_trait] +impl ServiceInitializer for TariPulseServiceInitializer { + async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> { + info!(target: LOG_TARGET, "Initializing Tari Pulse Service"); + let shutdown_signal = context.get_shutdown_signal(); + let (sender, receiver) = watch::channel(false); + context.register_handle(TariPulseHandle { + shutdown_signal: shutdown_signal.clone(), + failed_checkpoints_notifier: receiver, + }); + let config = TariPulseConfig { + check_interval: self.interval, + network: self.network, + }; + + context.spawn_when_ready(move |handles| async move { + let base_node_service = handles.expect_handle::(); + let mut tari_pulse_service = TariPulseService::new(config) + .await + .expect("Should be able to get the service"); + let tari_pulse_service = tari_pulse_service.run(base_node_service, sender); + futures::pin_mut!(tari_pulse_service); + future::select(tari_pulse_service, shutdown_signal).await; + info!(target: LOG_TARGET, "Tari Pulse Service shutdown"); + }); + info!(target: LOG_TARGET, "Tari Pulse Service initialized"); + Ok(()) + } +} diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index 8aeeec7c23..fe7c1e5ea3 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -45,6 +45,9 @@ # Obscure GRPC error responses (default = false) #report_grpc_error = false +# Interval between each request to the dns server for hte checkpoints to compare it with the local blockchain (default = 120 s) +# tari_pulse_interval = 120 + [base_node.lmdb] #init_size_bytes = 16_777_216 # 16 *1024 * 1024 #grow_size_bytes = 16_777_216 # 16 *1024 * 1024 diff --git a/common/src/exit_codes.rs b/common/src/exit_codes.rs index 412afe54e2..c11b64deb7 100644 --- a/common/src/exit_codes.rs +++ b/common/src/exit_codes.rs @@ -130,6 +130,8 @@ pub enum ExitCode { WalletPaymentAddress = 123, #[error("Unable to configure TLS")] TlsConfigurationError = 124, + #[error("Unable to setup tari pulse")] + TariPulseError = 125, } impl From for ExitError {