Skip to content

Commit

Permalink
small updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrichard committed Jul 2, 2024
1 parent dbf1cf6 commit 0e0f363
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition = "2021"
minotari_app_grpc = { git = "https://github.com/ksrichard/tari.git", branch = "p2pool" }
minotari_node_grpc_client = { git = "https://github.com/ksrichard/tari.git", branch = "p2pool" }
tari_common_types = { git = "https://github.com/ksrichard/tari.git", branch = "p2pool" }
tari_common = { git = "https://github.com/ksrichard/tari.git", branch = "p2pool" }
tari_core = { git = "https://github.com/ksrichard/tari.git", branch = "p2pool" }

tari_utilities = { version = "0.7", features = ["borsh"] }
Expand Down
6 changes: 6 additions & 0 deletions Cross.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[target.x86_64-unknown-linux-gnu]
pre-build = [
"dpkg --add-architecture $CROSS_DEB_ARCH",
"apt-get update",
"apt-get install --no-install-recommends --assume-yes apt-transport-https ca-certificates curl gpg bash less openssl libssl-dev pkg-config libsqlite3-dev:$CROSS_DEB_ARCH libsqlite3-0:$CROSS_DEB_ARCH libreadline-dev git cmake dh-autoreconf clang g++ libc++-dev libc++abi-dev libprotobuf-dev protobuf-compiler:$CROSS_DEB_ARCH libncurses5-dev libncursesw5-dev libudev-dev libhidapi-dev zip"
]
14 changes: 7 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::PathBuf;

use clap::builder::styling::AnsiColor;
use clap::builder::Styles;
use clap::builder::styling::AnsiColor;
use clap::Parser;
use env_logger::Builder;
use log::LevelFilter;
Expand Down Expand Up @@ -66,13 +66,13 @@ struct Cli {
)]
private_key_folder: PathBuf,

/// Mining enabled
/// Mining disabled
///
/// In case it is set to false, the node will only handle p2p operations,
/// In case it is set, the node will only handle p2p operations,
/// will be syncing with share chain, but not starting gRPC services and no Tari base node needed.
/// By setting this to, false it can be used as a stable node for routing only.
#[arg(long, value_name = "mining-enabled", default_value_t = true)]
mining_enabled: bool,
/// By setting this it can be used as a stable node for routing only.
#[arg(long, value_name = "mining-disabled", default_value_t = false)]
mining_disabled: bool,
}

#[tokio::main]
Expand All @@ -92,7 +92,7 @@ async fn main() -> anyhow::Result<()> {
}
config_builder.with_stable_peer(cli.stable_peer);
config_builder.with_private_key_folder(cli.private_key_folder);
config_builder.with_mining_enabled(cli.mining_enabled);
config_builder.with_mining_enabled(!cli.mining_disabled);

// server start
let config = config_builder.build();
Expand Down
61 changes: 35 additions & 26 deletions src/server/p2p/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,36 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use libp2p::{
gossipsub, kad, mdns, Multiaddr, noise, request_response, StreamProtocol, Swarm, tcp, yamux,
};
use libp2p::futures::StreamExt;
use libp2p::gossipsub::{IdentTopic, Message, PublishError};
use libp2p::identity::Keypair;
use libp2p::kad::store::MemoryStore;
use libp2p::kad::{Event, Mode};
use libp2p::kad::store::MemoryStore;
use libp2p::mdns::tokio::Tokio;
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{cbor, ResponseChannel};
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{
gossipsub, kad, mdns, noise, request_response, tcp, yamux, Multiaddr, StreamProtocol, Swarm,
};
use log::{debug, error, info, warn};
use tari_common::configuration::Network;
use tari_utilities::hex::Hex;
use tokio::{io, select};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, mpsc};
use tokio::{io, select};
use tokio::sync::broadcast::error::RecvError;

use crate::server::config;
use crate::server::p2p::{
client, Error, LibP2PError, messages, ServiceClient, ServiceClientChannels,
};
use crate::server::p2p::messages::{
PeerInfo, ShareChainSyncRequest, ShareChainSyncResponse, ValidateBlockRequest,
ValidateBlockResult,
};
use crate::server::p2p::peer_store::PeerStore;
use crate::server::p2p::{
client, messages, Error, LibP2PError, ServiceClient, ServiceClientChannels,
};
use crate::sharechain::block::Block;
use crate::sharechain::ShareChain;

Expand Down Expand Up @@ -75,8 +76,8 @@ pub struct ServerNetworkBehaviour {
/// Service is the implementation that holds every peer-to-peer related logic
/// that makes sure that all the communications, syncing, broadcasting etc... are done.
pub struct Service<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain + Send + Sync + 'static,
{
swarm: Swarm<ServerNetworkBehaviour>,
port: u16,
Expand All @@ -96,8 +97,8 @@ where
}

impl<S> Service<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain + Send + Sync + 'static,
{
/// Constructs a new Service from the provided config.
/// It also instantiates libp2p swarm inside.
Expand Down Expand Up @@ -202,7 +203,7 @@ where
mdns::Config::default(),
key_pair.public().to_peer_id(),
)
.map_err(|e| Error::LibP2P(LibP2PError::IO(e)))?,
.map_err(|e| Error::LibP2P(LibP2PError::IO(e)))?,
share_chain_sync: cbor::Behaviour::<
ShareChainSyncRequest,
ShareChainSyncResponse,
Expand Down Expand Up @@ -260,7 +261,7 @@ where
match request_raw_result {
Ok(request_raw) => {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.publish(
IdentTopic::new(BLOCK_VALIDATION_REQUESTS_TOPIC),
IdentTopic::new(Self::topic_name(BLOCK_VALIDATION_REQUESTS_TOPIC)),
request_raw,
) {
error!(target: LOG_TARGET, "Failed to send block validation request: {error:?}");
Expand All @@ -286,7 +287,7 @@ where
.swarm
.behaviour_mut()
.gossipsub
.publish(IdentTopic::new(BLOCK_VALIDATION_RESULTS_TOPIC), result_raw)
.publish(IdentTopic::new(Self::topic_name(BLOCK_VALIDATION_RESULTS_TOPIC)), result_raw)
{
error!(target: LOG_TARGET, "Failed to publish block validation result: {error:?}");
}
Expand All @@ -309,7 +310,7 @@ where
self.swarm
.behaviour_mut()
.gossipsub
.publish(IdentTopic::new(PEER_INFO_TOPIC), peer_info_raw)
.publish(IdentTopic::new(Self::topic_name(PEER_INFO_TOPIC)), peer_info_raw)
.map_err(|error| Error::LibP2P(LibP2PError::Publish(error)))?;

Ok(())
Expand All @@ -326,7 +327,7 @@ where
.swarm
.behaviour_mut()
.gossipsub
.publish(IdentTopic::new(NEW_BLOCK_TOPIC), block_raw)
.publish(IdentTopic::new(Self::topic_name(NEW_BLOCK_TOPIC)), block_raw)
.map_err(|error| Error::LibP2P(LibP2PError::Publish(error)))
{
Ok(_) => {}
Expand All @@ -344,12 +345,19 @@ where
}
}

/// Generates the gossip sub topic names based on the current Tari network to avoid mixing up
/// blocks and peers with different Tari networks.
fn topic_name(topic: &str) -> String {
let network = Network::get_current_or_user_setting_or_default().to_string();
format!("{network}_{topic}")
}

/// Subscribing to a gossipsub topic.
fn subscribe(&mut self, topic: &str) {
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&IdentTopic::new(topic))
.subscribe(&IdentTopic::new(Self::topic_name(topic)))
.expect("must be subscribed to topic");
}

Expand All @@ -370,9 +378,10 @@ where
}
let peer = peer.unwrap();

let topic = message.topic.as_str();
let topic = message.topic.to_string();

match topic {
PEER_INFO_TOPIC => match messages::PeerInfo::try_from(message) {
topic if topic == Self::topic_name(PEER_INFO_TOPIC) => match messages::PeerInfo::try_from(message) {
Ok(payload) => {
debug!(target: LOG_TARGET, "New peer info: {peer:?} -> {payload:?}");
self.peer_store.add(peer, payload).await;
Expand All @@ -388,7 +397,7 @@ where
error!(target: LOG_TARGET, "Can't deserialize peer info payload: {:?}", error);
}
},
BLOCK_VALIDATION_REQUESTS_TOPIC => {
topic if topic == Self::topic_name(BLOCK_VALIDATION_REQUESTS_TOPIC) => {
match messages::ValidateBlockRequest::try_from(message) {
Ok(payload) => {
debug!(target: LOG_TARGET, "Block validation request: {payload:?}");
Expand Down Expand Up @@ -418,7 +427,7 @@ where
}
}
}
BLOCK_VALIDATION_RESULTS_TOPIC => {
topic if topic == Self::topic_name(BLOCK_VALIDATION_RESULTS_TOPIC) => {
match messages::ValidateBlockResult::try_from(message) {
Ok(payload) => {
let mut senders_to_delete = vec![];
Expand All @@ -438,7 +447,7 @@ where
}
}
// TODO: send a signature that proves that the actual block was coming from this peer
NEW_BLOCK_TOPIC => match Block::try_from(message) {
topic if topic == Self::topic_name(NEW_BLOCK_TOPIC) => match Block::try_from(message) {
Ok(payload) => {
info!(target: LOG_TARGET,"🆕 New block from broadcast: {:?}", &payload.hash().to_hex());
if let Err(error) = self.share_chain.submit_block(&payload).await {
Expand All @@ -449,7 +458,7 @@ where
error!(target: LOG_TARGET, "Can't deserialize broadcast block payload: {:?}", error);
}
},
&_ => {
_ => {
warn!(target: LOG_TARGET, "Unknown topic {topic:?}!");
}
}
Expand Down Expand Up @@ -493,7 +502,7 @@ where
while self.peer_store.tip_of_block_height().await.is_none() {} // waiting for the highest blockchain
match self.peer_store.tip_of_block_height().await {
Some(result) => {
debug!(target: LOG_TARGET, "Found highet block height: {result:?}");
debug!(target: LOG_TARGET, "Found highest block height: {result:?}");
match self.share_chain.tip_height().await {
Ok(tip) => {
debug!(target: LOG_TARGET, "Send share chain sync request: {result:?}");
Expand Down
17 changes: 7 additions & 10 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer;
use minotari_app_grpc::tari_rpc::sha_p2_pool_server::ShaP2PoolServer;
use thiserror::Error;

use crate::server::{config, grpc, p2p};
use crate::server::grpc::base_node::TariBaseNodeGrpc;
use crate::server::grpc::error::TonicError;
use crate::server::grpc::p2pool::ShaP2PoolGrpc;
use crate::server::{config, grpc, p2p};
use crate::sharechain::ShareChain;

const LOG_TARGET: &str = "server";
Expand All @@ -27,8 +27,8 @@ pub enum Error {

/// Server represents the server running all the necessary components for sha-p2pool.
pub struct Server<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain + Send + Sync + 'static,
{
config: config::Config,
p2p_service: p2p::Service<S>,
Expand All @@ -38,15 +38,12 @@ where

// TODO: add graceful shutdown
impl<S> Server<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain + Send + Sync + 'static,
{
pub async fn new(config: config::Config, share_chain: S) -> Result<Self, Error> {
let share_chain = Arc::new(share_chain);

// TODO: have base node's network here and pass to p2p_service to be able to subscribe to the right gossipsub topics
// TODO: se we are not mixing main net and test net blocks.

let mut p2p_service: p2p::Service<S> = p2p::Service::new(&config, share_chain.clone())
.await
.map_err(Error::P2PService)?;
Expand All @@ -64,8 +61,8 @@ where
p2p_service.client(),
share_chain.clone(),
)
.await
.map_err(Error::Grpc)?;
.await
.map_err(Error::Grpc)?;
p2pool_server = Some(ShaP2PoolServer::new(p2pool_grpc_service));
}

Expand Down
5 changes: 3 additions & 2 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tari_utilities::epoch_time::EpochTime;
use tari_utilities::hex::Hex;
use tokio::sync::{RwLock, RwLockWriteGuard};

use crate::sharechain::{Block, MAX_BLOCKS_COUNT, SHARE_COUNT, ShareChain, ShareChainResult};
use crate::sharechain::error::{BlockConvertError, Error};
use crate::sharechain::{Block, ShareChain, ShareChainResult, MAX_BLOCKS_COUNT, SHARE_COUNT};

const LOG_TARGET: &str = "in_memory_share_chain";

Expand Down Expand Up @@ -100,7 +100,8 @@ impl InMemoryShareChain {
}
} else if !clear_before_add && last_block.is_none() {
return Err(Error::Empty);
} else if clear_before_add {
} else if (clear_before_add && last_block.is_none()) ||
(clear_before_add && last_block.is_some() && last_block.unwrap().height() < block.height()) {
// if we are synchronizing blocks, we trust we receive all the valid blocks
blocks.clear();
}
Expand Down

0 comments on commit 0e0f363

Please sign in to comment.