diff --git a/crates/delegator/src/bid_queue.rs b/crates/delegator/src/bid_queue.rs index c958442..b00f3cc 100644 --- a/crates/delegator/src/bid_queue.rs +++ b/crates/delegator/src/bid_queue.rs @@ -5,6 +5,8 @@ use zetina_common::process::Process; pub struct BidQueue {} +pub type BidQueueResult = Result<(kad::RecordKey, BTreeMap>), BidControllerError>; + impl BidQueue { pub fn new() -> Self { Self {} @@ -26,46 +28,38 @@ impl BidQueue { ) { let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10); let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10); - let future: Pin< - Box< - dyn Future< - Output = Result< - (kad::RecordKey, BTreeMap>), - BidControllerError, - >, - > + Send - + '_, - >, - > = Box::pin(async move { - let duration = Duration::from_secs(5); - let mut bids: Option>> = Some(BTreeMap::new()); - loop { - tokio::select! { - Some((price, peerid)) = bid_rx.recv() => { - match &mut bids { - Some(bids) => { - match bids.get_mut(&price) { - Some(vec) => { - vec.push(peerid); - }, - None => { - bids.insert(price, vec![peerid]); + let future: Pin + Send + '_>> = Box::pin( + async move { + let duration = Duration::from_secs(5); + let mut bids: Option>> = Some(BTreeMap::new()); + loop { + tokio::select! { + Some((price, peerid)) = bid_rx.recv() => { + match &mut bids { + Some(bids) => { + match bids.get_mut(&price) { + Some(vec) => { + vec.push(peerid); + }, + None => { + bids.insert(price, vec![peerid]); + } } - } - }, - None => break Err(BidControllerError::BidsTerminated) + }, + None => break Err(BidControllerError::BidsTerminated) + } } + _ = sleep(duration) => { + break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?)) + } + _ = terminate_rx.recv() => { + break Err(BidControllerError::TaskTerminated); + } + else => break Err(BidControllerError::TaskTerminated) } - _ = sleep(duration) => { - break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?)) - } - _ = terminate_rx.recv() => { - break Err(BidControllerError::TaskTerminated); - } - else => break Err(BidControllerError::TaskTerminated) } - } - }); + }, + ); (Process::new(future, terminate_tx), bid_tx) } diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index d27d713..fe275f5 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -59,17 +59,16 @@ async fn main() -> Result<(), Box> { FieldElement::from_byte_slice_be(private_key.as_slice()).unwrap(), ); - let mut swarm_runner = SwarmRunner::new( + let swarm_runner = SwarmRunner::new( cli.listen_address.parse()?, + cli.dial_addresses + .iter() + .map(|addr| Multiaddr::from_str(addr)) + .collect::, _>>()?, p2p_keypair, Multiaddr::from_str(&cli.address).unwrap(), )?; - cli.dial_addresses - .into_iter() - .try_for_each(|addr| swarm_runner.swarm.dial(Multiaddr::from_str(&addr).unwrap())) - .unwrap(); - let (gossipsub_tx, gossipsub_rx) = mpsc::channel::(100); let (kademlia_tx, kademlia_rx) = mpsc::channel::(100); let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx); diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index cf11c53..8fc35c2 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -56,17 +56,16 @@ async fn main() -> Result<(), Box> { .join("../../"); let bootloader_program_path = ws_root.join("target/bootloader.json"); - let mut swarm_runner = SwarmRunner::new( + let swarm_runner = SwarmRunner::new( cli.listen_address.parse()?, + cli.dial_addresses + .iter() + .map(|addr| Multiaddr::from_str(addr)) + .collect::, _>>()?, p2p_keypair, - Multiaddr::from_str(&cli.address).unwrap(), + Multiaddr::from_str(&cli.address)?, )?; - cli.dial_addresses - .into_iter() - .try_for_each(|addr| swarm_runner.swarm.dial(Multiaddr::from_str(&addr).unwrap())) - .unwrap(); - let (gossipsub_tx, gossipsub_rx) = mpsc::channel::(100); let (kademlia_tx, kademlia_rx) = mpsc::channel::(100); let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx); diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 3992b4a..c4b7272 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -23,6 +23,9 @@ pub struct PeerBehaviour { pub struct SwarmRunner { pub swarm: Swarm, + pub listen_multiaddr: Multiaddr, + pub dial_multiaddrs: Vec, + pub p2p_keypair: Keypair, pub p2p_multiaddr: Multiaddr, } @@ -88,12 +91,14 @@ pub enum DelegationMessage { impl SwarmRunner { pub fn new( listen_multiaddr: Multiaddr, + dial_multiaddrs: Vec, p2p_keypair: Keypair, p2p_multiaddr: Multiaddr, ) -> Result> { let mut config = Config::default(); config.set_max_packet_size(1024 * 1024 * 100); - let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair) + config.set_query_timeout(Duration::from_secs(60)); + let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair.to_owned()) .with_tokio() .with_tcp( tcp::Config::default().port_reuse(true), @@ -115,7 +120,7 @@ impl SwarmRunner { ), gossipsub: Self::init_gossip(p2p_keypair).unwrap(), })? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10))) + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?; @@ -123,9 +128,11 @@ impl SwarmRunner { swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Delegation.as_str()))?; swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); // swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; - swarm.listen_on(listen_multiaddr)?; + swarm.listen_on(listen_multiaddr.to_owned())?; - Ok(SwarmRunner { swarm, p2p_multiaddr }) + dial_multiaddrs.iter().try_for_each(|addr| swarm.dial(addr.clone()))?; + + Ok(SwarmRunner { swarm, listen_multiaddr, dial_multiaddrs, p2p_keypair, p2p_multiaddr }) } fn init_gossip( @@ -229,6 +236,9 @@ impl SwarmRunner { if num_established == 0 { self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, endpoint.get_remote_address()); + if let Err(err) = self.swarm.dial(endpoint.get_remote_address().to_owned()) { + error!("Failed to re-dial peer: {err:?}"); + } } } SwarmEvent::Behaviour(PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, result, stats, step })) => { diff --git a/crates/prover/src/lib.rs b/crates/prover/src/lib.rs index 20652e8..a5beb8e 100644 --- a/crates/prover/src/lib.rs +++ b/crates/prover/src/lib.rs @@ -1,4 +1,3 @@ pub mod errors; pub mod stone_prover; -#[allow(async_fn_in_trait)] pub mod traits; diff --git a/crates/prover/src/stone_prover/mod.rs b/crates/prover/src/stone_prover/mod.rs index 358100d..56f4512 100644 --- a/crates/prover/src/stone_prover/mod.rs +++ b/crates/prover/src/stone_prover/mod.rs @@ -124,10 +124,10 @@ pub fn params(n_steps: u64) -> Params { ) .collect(), last_layer_degree_bound, - n_queries: 20, + n_queries: 10, proof_of_work_bits: 30, }, - log_n_cosets: 2, + log_n_cosets: 1, }, ..Default::default() } diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index bc2c519..4a5f7c2 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -1,4 +1,3 @@ pub mod cairo_runner; pub mod errors; -#[allow(async_fn_in_trait)] pub mod traits; diff --git a/deployment/kustomization.yaml b/deployment/kustomization.yaml index 67fde47..167ba39 100644 --- a/deployment/kustomization.yaml +++ b/deployment/kustomization.yaml @@ -15,7 +15,7 @@ resources: configMapGenerator: - name: zetina-config literals: - - RUST_LOG=indo + - RUST_LOG=info - CAIRO_PATH=cairo - BOOTLOADER_PATH=bootloader/starknet/simple_bootloader.cairo - BOOTLOADER_OUT_NAME=bootloader.json