Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7c2c6f2
port packing actor to Tokio-native service while keeping full backwar…
craigmayhew Oct 1, 2025
54e01a5
migrate call sites to use the Tokio packing handle while keeping the …
craigmayhew Oct 1, 2025
6db926f
Replace Actix-based PackingActor with a Tokio-native PackingService a…
craigmayhew Oct 1, 2025
bc51f8e
remove unused imports
craigmayhew Oct 1, 2025
35633e7
Merge branch 'master' into craig/actix-tokio-packing-actor
craigmayhew Oct 1, 2025
f4fc14d
refactor: make packing handle mandatory and wire it through services …
craigmayhew Oct 1, 2025
a587b00
remove dead lines
craigmayhew Oct 1, 2025
52c32da
refactor: remove packing handle from ActorAddresses; use ServiceSenders
craigmayhew Oct 1, 2025
08b7503
whitespace
craigmayhew Oct 1, 2025
29df2c1
actor_addresses field removed
craigmayhew Oct 1, 2025
ddc8af4
Merge branch 'master' into craig/actix-tokio-packing-actor
craigmayhew Oct 1, 2025
fd229cb
error! when receiing packing request for unknown storage module id
craigmayhew Oct 2, 2025
eaeadc5
update comments that still mention actix
craigmayhew Oct 2, 2025
6eabf61
bound the sender to 5_000
craigmayhew Oct 2, 2025
e9514c6
yield for every ms, try send for 50 ms and then potentially fail
craigmayhew Oct 2, 2025
c5d539b
tidy tokio imports
craigmayhew Oct 2, 2025
34bd28f
merge use lines
craigmayhew Oct 2, 2025
a986948
Merge branch 'master' into craig/actix-tokio-packing-actor
craigmayhew Oct 2, 2025
a37a23c
perf: Notify/event-driven wait_for_packing and associated wiring
craigmayhew Oct 2, 2025
6bc4914
perf: replace 500 ms warmup with event-driven short wait using Notify
craigmayhew Oct 2, 2025
473f213
test: oneshot channel instead of a shared RwLock
craigmayhew Oct 2, 2025
a71dc6b
switch some debug to trace
craigmayhew Oct 2, 2025
4883d54
removed logic for checking if channel is saturated, just drop and warn
craigmayhew Oct 2, 2025
5df1d69
compiles
craigmayhew Oct 2, 2025
4af3221
remove unused imports
craigmayhew Oct 2, 2025
fa3b66c
tidy imports
craigmayhew Oct 2, 2025
7bbbef0
whitespace
craigmayhew Oct 2, 2025
78fbcdb
tracing::debug import fix for nvidia feature flag
craigmayhew Oct 2, 2025
d23dd57
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 3, 2025
fcbc914
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 3, 2025
539a305
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 3, 2025
c3dd764
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 6, 2025
d6dae2e
Merge remote-tracking branch 'origin/master' into craig/actix-tokio-p…
craigmayhew Oct 10, 2025
0e5f175
send -> try_send correction from merge
craigmayhew Oct 10, 2025
4ff4c84
unused import
craigmayhew Oct 10, 2025
6f546a1
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 13, 2025
b00ae93
remove ActorAddresses
craigmayhew Oct 13, 2025
af9e42f
ensure a closed cmd_rx channel does not hog the biased select
craigmayhew Oct 13, 2025
bd90f14
actix refactored out of partition_expiration_and_repacking_test()
craigmayhew Oct 13, 2025
99f9f14
add a summary doc to partition_expiration_and_repacking_test()
craigmayhew Oct 13, 2025
aa12f0d
replace comment with warn!
craigmayhew Oct 13, 2025
1872c23
Refactor test_solution() test to Tokio service
craigmayhew Oct 14, 2025
d88d593
unused lint rule
craigmayhew Oct 14, 2025
052025d
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 14, 2025
f99fb25
add test_get_recall_range helper
craigmayhew Oct 14, 2025
f70b4de
refactor test_recall_range_reinit() to use tokio
craigmayhew Oct 14, 2025
f694ce2
actix_rt::test -> tokio::test
craigmayhew Oct 14, 2025
974fee7
fix: debug argument order
craigmayhew Oct 14, 2025
83b6c06
maximise try_send failure logging
craigmayhew Oct 14, 2025
297d8c7
Merge branch 'master' into craig/actix-tokio-partitions
craigmayhew Oct 14, 2025
40e679b
simplify handling of closed ctrl channel
craigmayhew Oct 14, 2025
81d41f4
keep controller handle alive during test
craigmayhew Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions crates/actors/src/addresses.rs

This file was deleted.

174 changes: 130 additions & 44 deletions crates/actors/src/broadcast_mining_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ use actix::prelude::*;
use irys_types::{block_production::Seed, H256List, IrysBlockHeader};
use irys_vdf::MiningBroadcaster;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, info, Span};
// Message types

/// Subscribes a `PartitionMiningActor` so the broadcaster to receive broadcast messages
/// Subscribes a `PartitionMiningActor` so the broadcaster can send broadcast messages
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub struct Subscribe(pub Addr<PartitionMiningActor>);

/// Unsubscribes a `PartitionMiningActor` so from the broadcaster
/// Unsubscribes a `PartitionMiningActor` from the broadcaster
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub struct Unsubscribe(pub Addr<PartitionMiningActor>);

/// Send the most recent mining step to all the `PartitionMiningActors`
/// Send the most recent mining step to all subscribers
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
pub struct BroadcastMiningSeed {
Expand All @@ -25,7 +25,7 @@ pub struct BroadcastMiningSeed {
pub global_step: u64,
}

/// Send the latest difficulty update to all the `PartitionMiningActors`
/// Send the latest difficulty update to all subscribers
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
pub struct BroadcastDifficultyUpdate(pub Arc<IrysBlockHeader>);
Expand All @@ -35,54 +35,111 @@ pub struct BroadcastDifficultyUpdate(pub Arc<IrysBlockHeader>);
#[rtype(result = "()")]
pub struct BroadcastPartitionsExpiration(pub H256List);

/// Tokio-side broadcast envelope so non-Actix subscribers can receive events
#[derive(Debug, Clone)]
pub enum MiningBroadcastEvent {
Seed(BroadcastMiningSeed),
Difficulty(BroadcastDifficultyUpdate),
PartitionsExpiration(BroadcastPartitionsExpiration),
}

/// Subscribe a Tokio channel to receive broadcast messages
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub struct SubscribeTokio(pub UnboundedSender<MiningBroadcastEvent>);

/// Unsubscribe a Tokio channel from broadcast messages
///
/// Note: Unsubscribe is best-effort; we primarily prune closed channels on send failure.
/// This is provided for symmetry with the Actix API.
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub struct UnsubscribeTokio(pub UnboundedSender<MiningBroadcastEvent>);

/// Broadcaster actor
#[derive(Debug, Default)]
pub struct BroadcastMiningService {
pub subscribers: Vec<Addr<PartitionMiningActor>>,
pub tokio_subscribers: Vec<UnboundedSender<MiningBroadcastEvent>>,
pub span: Option<Span>,
}

// Actor Definition
impl Actor for BroadcastMiningService {
type Context = Context<Self>;
}

/// Adds this actor the the local service registry
impl Supervised for BroadcastMiningService {}

impl SystemService for BroadcastMiningService {
fn service_started(&mut self, _ctx: &mut Context<Self>) {
debug!("service started: broadcast_mining (Default)");
}
}

impl BroadcastMiningService {
/// Initialize a new `MiningBroadcaster`
pub fn new(span: Option<Span>) -> Self {
Self {
subscribers: Vec::new(),
tokio_subscribers: Vec::new(),
span: Some(span.unwrap_or(Span::current())),
}
}
}

impl Actor for BroadcastMiningService {
type Context = Context<Self>;
}
fn with_span<F: FnOnce()>(span: &Option<Span>, f: F) {
if let Some(span) = span {
let span = span.clone();
let _entered = span.enter();
f();
} else {
f();
}
}

/// Adds this actor the the local service registry
impl Supervised for BroadcastMiningService {}
fn broadcast_to_actix(
&mut self,
seed: Option<&BroadcastMiningSeed>,
diff: Option<&BroadcastDifficultyUpdate>,
exp: Option<&BroadcastPartitionsExpiration>,
) {
// prune disconnected actix subscribers
self.subscribers.retain(actix::Addr::connected);

impl SystemService for BroadcastMiningService {
fn service_started(&mut self, _ctx: &mut Context<Self>) {
debug!("service started: broadcast_mining (Default)");
for sub in &self.subscribers {
if let Some(msg) = seed {
sub.do_send(msg.clone());
}
if let Some(msg) = diff {
sub.do_send(msg.clone());
}
if let Some(msg) = exp {
sub.do_send(msg.clone());
}
}
}

fn broadcast_to_tokio(&mut self, evt: MiningBroadcastEvent) {
// retain only channels that can receive
self.tokio_subscribers
.retain(|tx| tx.send(evt.clone()).is_ok());
}
}

// Handle subscriptions
// Handle Actix subscriptions
impl Handler<Subscribe> for BroadcastMiningService {
type Result = ();

fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
if self.span.is_some() {
let span = self.span.clone().unwrap();
let _span = span.enter();
}

debug!("PartitionMiningActor subscribed");

Self::with_span(&self.span, || {
debug!("PartitionMiningActor subscribed");
});
self.subscribers.push(msg.0);
}
}

// Handle unsubscribe
// Handle Actix unsubscribe
impl Handler<Unsubscribe> for BroadcastMiningService {
type Result = ();

Expand All @@ -91,50 +148,79 @@ impl Handler<Unsubscribe> for BroadcastMiningService {
}
}

// Handle broadcasts
// Handle Tokio subscriptions
impl Handler<SubscribeTokio> for BroadcastMiningService {
type Result = ();

fn handle(&mut self, msg: SubscribeTokio, _: &mut Context<Self>) {
Self::with_span(&self.span, || {
debug!("Tokio subscriber registered");
});
self.tokio_subscribers.push(msg.0);
}
}

// Handle Tokio unsubscribe (best-effort; primarily pruned on failed sends)
impl Handler<UnsubscribeTokio> for BroadcastMiningService {
type Result = ();

fn handle(&mut self, _msg: UnsubscribeTokio, _: &mut Context<Self>) {
// No-op: we cannot reliably compare UnboundedSender instances for removal.
// Subscribers are removed automatically if sending fails.
}
}

// Handle seed broadcasts
impl Handler<BroadcastMiningSeed> for BroadcastMiningService {
type Result = ();

fn handle(&mut self, msg: BroadcastMiningSeed, _: &mut Context<Self>) {
let span = self.span.clone().unwrap();
let _span = span.enter();
info!(
"Broadcast Mining: {:?} subs: {}",
msg.seed,
&self.subscribers.len()
);
self.subscribers.retain(actix::Addr::connected);
for subscriber in &self.subscribers {
subscriber.do_send(msg.clone());
}
let total = self.subscribers.len() + self.tokio_subscribers.len();
Self::with_span(&self.span, || {
info!("Broadcast Mining: {:?} subs: {}", msg.seed, total);
});

// Actix subscribers
self.broadcast_to_actix(Some(&msg), None, None);

// Tokio subscribers
self.broadcast_to_tokio(MiningBroadcastEvent::Seed(msg));
}
}

// Handle difficulty updates
impl Handler<BroadcastDifficultyUpdate> for BroadcastMiningService {
type Result = ();

fn handle(&mut self, msg: BroadcastDifficultyUpdate, _: &mut Context<Self>) {
self.subscribers.retain(actix::Addr::connected);
for subscriber in &self.subscribers {
subscriber.do_send(msg.clone());
}
// Actix subscribers
self.broadcast_to_actix(None, Some(&msg), None);

// Tokio subscribers
self.broadcast_to_tokio(MiningBroadcastEvent::Difficulty(msg));
}
}

// Handle partition expiration broadcasts
impl Handler<BroadcastPartitionsExpiration> for BroadcastMiningService {
type Result = ();

fn handle(&mut self, msg: BroadcastPartitionsExpiration, _: &mut Context<Self>) {
self.subscribers.retain(actix::Addr::connected);
debug!(msg = ?msg.0, "Broadcasting expiration, expired partition hashes");
for subscriber in &self.subscribers {
subscriber.do_send(msg.clone());
}
Self::with_span(&self.span, || {
debug!(msg = ?msg.0, "Broadcasting expiration, expired partition hashes");
});

// Actix subscribers
self.broadcast_to_actix(None, None, Some(&msg));

// Tokio subscribers
self.broadcast_to_tokio(MiningBroadcastEvent::PartitionsExpiration(msg));
}
}

#[derive(Debug, Clone)]
pub struct MiningServiceBroadcaster(Addr<BroadcastMiningService>);

impl MiningBroadcaster for MiningServiceBroadcaster {
fn broadcast(&self, seed: Seed, checkpoints: H256List, global_step: u64) {
self.0.do_send(BroadcastMiningSeed {
Expand Down
4 changes: 2 additions & 2 deletions crates/actors/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod addresses;
pub mod block_discovery;
pub mod block_index_service;
pub mod block_producer;
Expand All @@ -12,17 +11,18 @@ pub mod data_sync_service;
pub mod mempool_service;
pub mod mining;
pub mod packing;
pub mod partition_mining_service;
pub mod reth_service;
pub mod services;
pub mod shadow_tx_generator;
pub mod storage_module_service;
pub mod test_helpers;
pub mod validation_service;

pub use addresses::*;
pub use block_producer::*;
pub use data_sync_service::*;
pub use mempool_service::*;
pub use partition_mining_service::*;
pub use reth_ethereum_primitives;
pub use shadow_tx_generator::ShadowMetadata;
pub use storage_module_service::*;
Expand Down
Loading