Skip to content

Commit

Permalink
feat: block pool mining if not synced (#213)
Browse files Browse the repository at this point in the history
Description
---
block pool mining if we are more than 10 blocks out with a sync, 
This will allow the node to solo mine, but it wont mine p2pool blocks

---------

Co-authored-by: stringhandler <[email protected]>
  • Loading branch information
SWvheerden and stringhandler authored Dec 6, 2024
1 parent a597dfc commit 503249c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 55 deletions.
45 changes: 31 additions & 14 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ where S: ShareChain
list_of_templates_sha3x: RwLock<VecDeque<FixedHash>>,
template_store_rx: RwLock<HashMap<FixedHash, P2Block>>,
list_of_templates_rx: RwLock<VecDeque<FixedHash>>,
are_we_synced_with_p2pool: Arc<AtomicBool>,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
}

impl<S> ShaP2PoolGrpc<S>
Expand All @@ -97,7 +98,8 @@ where S: ShareChain
genesis_block_hash: FixedHash,
stats_broadcast: StatsBroadcastClient,
squad: Squad,
are_we_synced_with_p2pool: Arc<AtomicBool>,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
) -> Result<Self, Error> {
Ok(Self {
local_peer_id,
Expand All @@ -121,17 +123,28 @@ where S: ShareChain
list_of_templates_sha3x: RwLock::new(VecDeque::with_capacity(MAX_STORED_TEMPLATES_SHA3X + 1)),
template_store_rx: RwLock::new(HashMap::new()),
list_of_templates_rx: RwLock::new(VecDeque::with_capacity(MAX_STORED_TEMPLATES_RX + 1)),
are_we_synced_with_p2pool,
are_we_synced_with_randomx_p2pool,
are_we_synced_with_sha3x_p2pool,
})
}

/// Submits a new block to share chain and broadcasts to the p2p network.
pub async fn submit_share_chain_block(&self, block: Arc<P2Block>) -> Result<(), Status> {
if !self.are_we_synced_with_p2pool.load(Ordering::Relaxed) {
info!(target: LOG_TARGET, "We are not synced yet, not submitting block atm");
return Ok(());
}
let pow_algo = block.original_header.pow.pow_algo;
match pow_algo {
PowAlgorithm::RandomX => {
if !self.are_we_synced_with_randomx_p2pool.load(Ordering::SeqCst) {
info!(target: LOG_TARGET, "We are not synced yet, not submitting block atm");
return Ok(());
}
},
PowAlgorithm::Sha3x => {
if !self.are_we_synced_with_sha3x_p2pool.load(Ordering::SeqCst) {
info!(target: LOG_TARGET, "We are not synced yet, not submitting block atm");
return Ok(());
}
},
};
let share_chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
Expand Down Expand Up @@ -204,9 +217,15 @@ where S: ShareChain
.map_err(|error| Status::failed_precondition(format!("Invalid wallet payment address: {}", error)))?;

// request new block template with shares as coinbases
let share_chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
let (share_chain, synced_status) = match pow_algo {
PowAlgorithm::RandomX => (
self.share_chain_random_x.clone(),
self.are_we_synced_with_randomx_p2pool.load(Ordering::SeqCst),
),
PowAlgorithm::Sha3x => (
self.share_chain_sha3x.clone(),
self.are_we_synced_with_sha3x_p2pool.load(Ordering::SeqCst),
),
};
let coinbase_extra =
convert_coinbase_extra(self.squad.clone(), grpc_req.coinbase_extra).unwrap_or_default();
Expand All @@ -217,7 +236,7 @@ where S: ShareChain
.clone();
// dbg!(&new_tip_block.height, &new_tip_block.hash);
let shares = share_chain
.generate_shares(&new_tip_block)
.generate_shares(&new_tip_block, !synced_status)
.await
.map_err(|error| Status::internal(format!("failed to generate shares {error:?}")))?;

Expand Down Expand Up @@ -290,9 +309,7 @@ where S: ShareChain
}

// what happens p2pool difficulty > base chain diff
if target_difficulty.as_u64() < miner_data.target_difficulty &&
self.are_we_synced_with_p2pool.load(Ordering::Relaxed)
{
if target_difficulty.as_u64() < miner_data.target_difficulty && synced_status {
miner_data.target_difficulty = target_difficulty.as_u64();
}
}
Expand Down
39 changes: 28 additions & 11 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ where S: ShareChain
inner_request_rx: mpsc::UnboundedReceiver<InnerRequest>,

relay_store: Arc<RwLock<RelayStore>>,
are_we_synced_with_p2pool: Arc<AtomicBool>,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
stats_broadcast_client: StatsBroadcastClient,
randomx_sync_semaphore: Arc<Semaphore>,
sha3x_sync_semaphore: Arc<Semaphore>,
Expand All @@ -335,7 +336,8 @@ where S: ShareChain
share_chain_random_x: Arc<S>,
network_peer_store: PeerStore,
shutdown_signal: ShutdownSignal,
are_we_synced_with_p2pool: Arc<AtomicBool>,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
stats_broadcast_client: StatsBroadcastClient,
) -> Result<Self, Error> {
let swarm = setup::new_swarm(config).await?;
Expand Down Expand Up @@ -376,7 +378,8 @@ where S: ShareChain
query_tx,
query_rx,
relay_store: Arc::new(RwLock::new(RelayStore::default())),
are_we_synced_with_p2pool,
are_we_synced_with_randomx_p2pool,
are_we_synced_with_sha3x_p2pool,
stats_broadcast_client,
randomx_sync_semaphore: Arc::new(Semaphore::new(config.p2p_service.num_concurrent_syncs)),
sha3x_sync_semaphore: Arc::new(Semaphore::new(config.p2p_service.num_concurrent_syncs)),
Expand Down Expand Up @@ -1546,14 +1549,20 @@ where S: ShareChain

let timer = Instant::now();
let algo = response.algo();
let share_chain = match algo {
let (share_chain, synced_bool) = match algo {
PowAlgorithm::RandomX => {
self.randomx_last_sync_requested_block = None;
self.share_chain_random_x.clone()
(
self.share_chain_random_x.clone(),
self.are_we_synced_with_randomx_p2pool.clone(),
)
},
PowAlgorithm::Sha3x => {
self.sha3x_last_sync_requested_block = None;
self.share_chain_sha3x.clone()
(
self.share_chain_sha3x.clone(),
self.are_we_synced_with_sha3x_p2pool.clone(),
)
},
};
let their_tip_hash = *response.tip_hash();
Expand All @@ -1567,7 +1576,6 @@ where S: ShareChain
let tx = self.inner_request_tx.clone();
let squad = self.config.squad.clone();
let network_peer_store = self.network_peer_store.clone();
let synced_bool = self.are_we_synced_with_p2pool.clone();
let recent_synced_tips = self.recent_synced_tips.get(&algo).cloned().unwrap();

tokio::spawn(async move {
Expand Down Expand Up @@ -1636,10 +1644,8 @@ where S: ShareChain
let _unused = tx.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
} else {
info!(target: SYNC_REQUEST_LOG_TARGET, "Catch up sync completed for chain {} from {} after {} catchups", algo, peer, num_catchups.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()));

// this only gets called after sync completes. So lets make sure we can mine since we completed
// a sync
synced_bool.store(true, std::sync::atomic::Ordering::Relaxed);
// this only gets called after sync completes, lets set synced status = true
synced_bool.store(true, std::sync::atomic::Ordering::SeqCst);
peer_store_write_lock.reset_catch_up_attempts(&peer);
}
if timer.elapsed() > MAX_ACCEPTABLE_P2P_MESSAGE_TIMEOUT {
Expand Down Expand Up @@ -1791,6 +1797,17 @@ where S: ShareChain
},
}

// get the sync bool
let sync_status = match algo {
PowAlgorithm::RandomX => self.are_we_synced_with_randomx_p2pool.clone(),
PowAlgorithm::Sha3x => self.are_we_synced_with_sha3x_p2pool.clone(),
};
let our_tip = share_chain.get_tip().await?.unwrap_or_default();
if our_tip.0 < their_height.saturating_sub(10) {
info!(target: SYNC_REQUEST_LOG_TARGET, "We({}) are out by more than 10 blocks from syncing peer({}), setting sync status to false", our_tip.0, their_height);
sync_status.store(false, std::sync::atomic::Ordering::SeqCst);
}

let outbound_request_id = self.swarm.behaviour_mut().catch_up_sync.send_request(
&peer,
CatchUpSyncRequest::new(algo, i_have_blocks, last_block_from_them),
Expand Down
21 changes: 14 additions & 7 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ where S: ShareChain
http_server: Option<Arc<HttpServer>>,
stats_collector: Option<StatsCollector>,
shutdown_signal: ShutdownSignal,
are_we_synced_with_p2pool: Arc<AtomicBool>,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
}

impl<S> Server<S>
Expand All @@ -56,7 +57,8 @@ where S: ShareChain
let share_chain_sha3x = Arc::new(share_chain_sha3x);
let share_chain_random_x = Arc::new(share_chain_random_x);
let network_peer_store = PeerStore::new(stats_broadcast_client.clone());
let are_we_synced_with_p2pool = Arc::new(AtomicBool::new(false));
let are_we_synced_with_randomx_p2pool = Arc::new(AtomicBool::new(false));
let are_we_synced_with_sha3x_p2pool = Arc::new(AtomicBool::new(false));
let stats_client = stats_collector.create_client();

let mut p2p_service: p2p::Service<S> = p2p::Service::new(
Expand All @@ -65,7 +67,8 @@ where S: ShareChain
share_chain_random_x.clone(),
network_peer_store,
shutdown_signal.clone(),
are_we_synced_with_p2pool.clone(),
are_we_synced_with_randomx_p2pool.clone(),
are_we_synced_with_sha3x_p2pool.clone(),
stats_broadcast_client.clone(),
)
.await?;
Expand Down Expand Up @@ -93,7 +96,8 @@ where S: ShareChain
genesis_block_hash,
stats_broadcast_client.clone(),
config.p2p_service.squad.clone(),
are_we_synced_with_p2pool.clone(),
are_we_synced_with_randomx_p2pool.clone(),
are_we_synced_with_sha3x_p2pool.clone(),
)
.await?;
p2pool_server = Some(ShaP2PoolServer::new(p2pool_grpc_service));
Expand All @@ -119,7 +123,8 @@ where S: ShareChain
http_server,
stats_collector: Some(stats_collector),
shutdown_signal,
are_we_synced_with_p2pool,
are_we_synced_with_randomx_p2pool,
are_we_synced_with_sha3x_p2pool,
})
}

Expand Down Expand Up @@ -148,12 +153,14 @@ where S: ShareChain
pub async fn start(&mut self) -> Result<(), Error> {
info!(target: LOG_TARGET, "⛏ Starting Tari SHA-3 mining P2Pool...");

let sync_start = self.are_we_synced_with_p2pool.clone();
let sync_start_sha3 = self.are_we_synced_with_sha3x_p2pool.clone();
let sync_start_rx = self.are_we_synced_with_randomx_p2pool.clone();
let time = self.config.network_silence_delay;
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(time)).await;
info!(target: LOG_TARGET, "Network silence, Setting as synced");
sync_start.store(true, std::sync::atomic::Ordering::Relaxed);
sync_start_sha3.store(true, std::sync::atomic::Ordering::SeqCst);
sync_start_rx.store(true, std::sync::atomic::Ordering::SeqCst);
});

if !self.config.p2p_service.is_seed_peer {
Expand Down
55 changes: 33 additions & 22 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,38 +509,49 @@ impl ShareChain for InMemoryShareChain {
// res
// }

async fn generate_shares(&self, new_tip_block: &P2Block) -> Result<Vec<NewBlockCoinbase>, ShareChainError> {
async fn generate_shares(
&self,
new_tip_block: &P2Block,
solo_mine: bool,
) -> Result<Vec<NewBlockCoinbase>, ShareChainError> {
let mut chain_read_lock = self.p2_chain.read().await;
// first check if there is a cached hashmap of shares
let mut miners_to_shares = if let Some(ref cached_shares) = chain_read_lock.cached_shares {
cached_shares.clone()
} else {
let mut miners_to_shares = if solo_mine {
HashMap::new()
} else {
let mut miners_to_shares = if let Some(ref cached_shares) = chain_read_lock.cached_shares {
cached_shares.clone()
} else {
HashMap::new()
};
if miners_to_shares.is_empty() {
drop(chain_read_lock);
// if there is none, lets see if we need to calculate one
let mut wl = self.p2_chain.write().await;
miners_to_shares = self.get_calculate_and_cache_hashmap_of_shares(&mut wl).await?;
chain_read_lock = wl.downgrade();
}
miners_to_shares
};
if miners_to_shares.is_empty() {
drop(chain_read_lock);
// if there is none, lets see if we need to calculate one
let mut wl = self.p2_chain.write().await;
miners_to_shares = self.get_calculate_and_cache_hashmap_of_shares(&mut wl).await?;
chain_read_lock = wl.downgrade();
}

// lets add the new tip block to the hashmap
miners_to_shares.insert(
new_tip_block.miner_wallet_address.to_base58(),
(MAIN_REWARD_SHARE, new_tip_block.miner_coinbase_extra.clone()),
);
for uncle in &new_tip_block.uncles {
let uncle_block = chain_read_lock
.level_at_height(uncle.0)
.ok_or(ShareChainError::UncleBlockNotFound)?
.blocks
.get(&uncle.1)
.ok_or(ShareChainError::UncleBlockNotFound)?;
miners_to_shares.insert(
uncle_block.miner_wallet_address.to_base58(),
(UNCLE_REWARD_SHARE, uncle_block.miner_coinbase_extra.clone()),
);
if !solo_mine {
for uncle in &new_tip_block.uncles {
let uncle_block = chain_read_lock
.level_at_height(uncle.0)
.ok_or(ShareChainError::UncleBlockNotFound)?
.blocks
.get(&uncle.1)
.ok_or(ShareChainError::UncleBlockNotFound)?;
miners_to_shares.insert(
uncle_block.miner_wallet_address.to_base58(),
(UNCLE_REWARD_SHARE, uncle_block.miner_coinbase_extra.clone()),
);
}
}

let mut res = vec![];
Expand Down
6 changes: 5 additions & 1 deletion src/sharechain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ pub(crate) trait ShareChain: Send + Sync + 'static {
async fn get_tip(&self) -> Result<Option<(u64, FixedHash)>, ShareChainError>;

/// Generate shares based on the previous blocks.
async fn generate_shares(&self, new_tip_block: &P2Block) -> Result<Vec<NewBlockCoinbase>, ShareChainError>;
async fn generate_shares(
&self,
new_tip_block: &P2Block,
solo_mine: bool,
) -> Result<Vec<NewBlockCoinbase>, ShareChainError>;

/// Generate a new block on tip of the chain.
async fn generate_new_tip_block(
Expand Down

0 comments on commit 503249c

Please sign in to comment.