diff --git a/crates/actors/src/packing.rs b/crates/actors/src/packing.rs index 8c5801031..12c60566d 100644 --- a/crates/actors/src/packing.rs +++ b/crates/actors/src/packing.rs @@ -26,7 +26,7 @@ pub struct PackingRequest { } pub type AtomicPackingJobQueue = Arc>>; -pub type PackingJobsBySM = HashMap; +pub type PackingJobsBySM = Arc>; pub type PackingSemaphore = Arc; @@ -76,10 +76,12 @@ impl PackingActor { config: PackingConfig, ) -> Self { let semaphore = Arc::new(Semaphore::new(config.concurrency.into())); - let pending_jobs = storage_module_ids - .iter() - .map(|s| (*s, Arc::new(RwLock::new(VecDeque::with_capacity(32))))) - .collect(); + let pending_jobs = Arc::new( + storage_module_ids + .iter() + .map(|s| (*s, Arc::new(RwLock::new(VecDeque::with_capacity(32))))) + .collect(), + ); Self { task_executor, @@ -321,9 +323,9 @@ pub struct GetInternals(); #[derive(Debug, MessageResponse, Clone)] pub struct Internals { - pending_jobs: PackingJobsBySM, - semaphore: PackingSemaphore, - config: PackingConfig, + pub pending_jobs: PackingJobsBySM, + pub semaphore: PackingSemaphore, + pub config: PackingConfig, } impl Handler for PackingActor { diff --git a/crates/chain/tests/multi_node/fork_recovery_epoch.rs b/crates/chain/tests/multi_node/fork_recovery_epoch.rs index c942372e3..4d2d0f1f7 100644 --- a/crates/chain/tests/multi_node/fork_recovery_epoch.rs +++ b/crates/chain/tests/multi_node/fork_recovery_epoch.rs @@ -1,4 +1,5 @@ use crate::utils::IrysNodeTest; +use irys_actors::packing::GetInternals; use irys_testing_utils::*; use irys_types::{NodeConfig, H256}; use tracing::debug; @@ -66,6 +67,32 @@ async fn heavy_fork_recovery_epoch_test() -> eyre::Result<()> { .wait_for_mempool(peer2_pledge_tx.id, seconds_to_wait) .await?; + // grab the packing actor's semaphores for peer1 & 2 + + let peer1_internals = peer1_node + .node_ctx + .actor_addresses + .packing + .send(GetInternals()) + .await?; + // grab all of peer1's semaphores + let peer1_permits = peer1_internals + .semaphore + .acquire_many(peer1_internals.semaphore.available_permits().try_into()?) + .await?; + + let peer2_internals = peer2_node + .node_ctx + .actor_addresses + .packing + .send(GetInternals()) + .await?; + // grab all of peer2's semaphores + let peer2_permits = peer2_internals + .semaphore + .acquire_many(peer2_internals.semaphore.available_permits().try_into()?) + .await?; + // Mine a block to get the commitments included genesis_node.mine_block().await.unwrap(); @@ -84,6 +111,37 @@ async fn heavy_fork_recovery_epoch_test() -> eyre::Result<()> { let _block_hash = peer1_node.wait_until_height(2, seconds_to_wait).await?; let _block_hash = peer2_node.wait_until_height(2, seconds_to_wait).await?; + // assert that the packing actors internal state has some pending jobs for the new assignment + // (it shouldn't dequeue any as we have all the semaphores) + + let found_peer1_packing_job = peer1_internals.pending_jobs.iter().any(|(_id, jobs)| { + return jobs.read().unwrap().iter().any(|job| { + job.storage_module.partition_assignment() == peer1_assignments.first().copied() + }); + }); + + assert!( + found_peer1_packing_job, + "Unable to get peer1 packing job for {}", + &peer1_assignments.first().unwrap().partition_hash + ); + + let found_peer2_packing_job = peer2_internals.pending_jobs.iter().any(|(_id, jobs)| { + return jobs.read().unwrap().iter().any(|job| { + job.storage_module.partition_assignment() == peer2_assignments.first().copied() + }); + }); + + assert!( + found_peer2_packing_job, + "Unable to get peer2 packing job for {}", + &peer2_assignments.first().unwrap().partition_hash + ); + + // give back all the semaphores + drop(peer1_permits); + drop(peer2_permits); + // Wait for them to pack their storage modules with the partition_hashes peer1_node.wait_for_packing(seconds_to_wait).await; peer2_node.wait_for_packing(seconds_to_wait).await;