Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions crates/actors/src/packing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct PackingRequest {
}

pub type AtomicPackingJobQueue = Arc<RwLock<VecDeque<PackingRequest>>>;
pub type PackingJobsBySM = HashMap<usize, AtomicPackingJobQueue>;
pub type PackingJobsBySM = Arc<HashMap<usize, AtomicPackingJobQueue>>;

pub type PackingSemaphore = Arc<Semaphore>;

Expand Down Expand Up @@ -76,10 +76,12 @@ impl PackingActor {
config: PackingConfig,
) -> Self {
let semaphore = Arc::new(Semaphore::new(config.concurrency.into()));
let pending_jobs = storage_module_ids
.iter()
.map(|s| (*s, Arc::new(RwLock::new(VecDeque::with_capacity(32)))))
.collect();
let pending_jobs = Arc::new(
storage_module_ids
.iter()
.map(|s| (*s, Arc::new(RwLock::new(VecDeque::with_capacity(32)))))
.collect(),
);

Self {
task_executor,
Expand Down Expand Up @@ -321,9 +323,9 @@ pub struct GetInternals();

#[derive(Debug, MessageResponse, Clone)]
pub struct Internals {
pending_jobs: PackingJobsBySM,
semaphore: PackingSemaphore,
config: PackingConfig,
pub pending_jobs: PackingJobsBySM,
pub semaphore: PackingSemaphore,
pub config: PackingConfig,
}

impl Handler<GetInternals> for PackingActor {
Expand Down
58 changes: 58 additions & 0 deletions crates/chain/tests/multi_node/fork_recovery_epoch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::utils::IrysNodeTest;
use irys_actors::packing::GetInternals;
use irys_testing_utils::*;
use irys_types::{NodeConfig, H256};
use tracing::debug;
Expand Down Expand Up @@ -66,6 +67,32 @@ async fn heavy_fork_recovery_epoch_test() -> eyre::Result<()> {
.wait_for_mempool(peer2_pledge_tx.id, seconds_to_wait)
.await?;

// grab the packing actor's semaphores for peer1 & 2

let peer1_internals = peer1_node
.node_ctx
.actor_addresses
.packing
.send(GetInternals())
.await?;
// grab all of peer1's semaphores
let peer1_permits = peer1_internals
.semaphore
.acquire_many(peer1_internals.semaphore.available_permits().try_into()?)
.await?;

let peer2_internals = peer2_node
.node_ctx
.actor_addresses
.packing
.send(GetInternals())
.await?;
// grab all of peer2's semaphores
let peer2_permits = peer2_internals
.semaphore
.acquire_many(peer2_internals.semaphore.available_permits().try_into()?)
.await?;

// Mine a block to get the commitments included
genesis_node.mine_block().await.unwrap();

Expand All @@ -84,6 +111,37 @@ async fn heavy_fork_recovery_epoch_test() -> eyre::Result<()> {
let _block_hash = peer1_node.wait_until_height(2, seconds_to_wait).await?;
let _block_hash = peer2_node.wait_until_height(2, seconds_to_wait).await?;

// assert that the packing actors internal state has some pending jobs for the new assignment
// (it shouldn't dequeue any as we have all the semaphores)

let found_peer1_packing_job = peer1_internals.pending_jobs.iter().any(|(_id, jobs)| {
return jobs.read().unwrap().iter().any(|job| {
job.storage_module.partition_assignment() == peer1_assignments.first().copied()
});
});

assert!(
found_peer1_packing_job,
"Unable to get peer1 packing job for {}",
&peer1_assignments.first().unwrap().partition_hash
);

let found_peer2_packing_job = peer2_internals.pending_jobs.iter().any(|(_id, jobs)| {
return jobs.read().unwrap().iter().any(|job| {
job.storage_module.partition_assignment() == peer2_assignments.first().copied()
});
});

assert!(
found_peer2_packing_job,
"Unable to get peer2 packing job for {}",
&peer2_assignments.first().unwrap().partition_hash
);

// give back all the semaphores
drop(peer1_permits);
drop(peer2_permits);

// Wait for them to pack their storage modules with the partition_hashes
peer1_node.wait_for_packing(seconds_to_wait).await;
peer2_node.wait_for_packing(seconds_to_wait).await;
Expand Down
Loading