-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Event Consumer and Producer #14
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a quick fix.
I would get rid of the filtering here. I think the data should have already been sanitized at this point, so we can return a meaningful error to the user.
diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs
index a54021d..38b443d 100644
--- a/sim-cli/src/main.rs
+++ b/sim-cli/src/main.rs
@@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
let config_str = std::fs::read_to_string(cli.config)?;
let Config { nodes, activity } = serde_json::from_str(&config_str)?;
- let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
+ let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>> = HashMap::new();
for node in nodes {
let lnd = LndNode::new(node.address, node.macaroon, node.cert).await?;
diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs
index cd0761c..0402c8d 100644
--- a/sim-lib/src/lib.rs
+++ b/sim-lib/src/lib.rs
@@ -2,7 +2,6 @@ use async_trait::async_trait;
use bitcoin::secp256k1::PublicKey;
use lightning::ln::PaymentHash;
use serde::{Deserialize, Serialize};
-use std::marker::Send;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
@@ -29,7 +28,7 @@ pub struct Config {
pub activity: Vec<ActivityDefinition>,
}
-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ActivityDefinition {
// The source of the action.
pub source: PublicKey,
@@ -85,7 +84,7 @@ struct PaymentResult {
pub struct Simulation {
// The lightning node that is being simulated.
- nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
+ nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
// The activity that are to be executed on the node.
activity: Vec<ActivityDefinition>,
@@ -93,7 +92,7 @@ pub struct Simulation {
impl Simulation {
pub fn new(
- nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
+ nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
activity: Vec<ActivityDefinition>,
) -> Self {
Self { nodes, activity }
@@ -115,30 +114,22 @@ impl Simulation {
let mut producer_channels = HashMap::new();
let mut set = tokio::task::JoinSet::new();
- for active_node in self.nodes.iter().filter(|k| {
- for node in self.activity.iter() {
- if node.source == *k.0 {
- return true;
- }
- }
-
- false
- }) {
+ for (node_id, node) in self.nodes.iter() {
// For each active node, we'll create a sender and receiver channel to produce and consumer
// events. We do not buffer channels as we expect events to clear quickly.
let (sender, receiver) = mpsc::channel(0);
// Generate a consumer for the receiving end of the channel.
- set.spawn(consume_events(active_node.1.clone(), receiver));
+ set.spawn(consume_events(node.clone(), receiver));
// Add the producer channel to our map so that various activity descriptions can use it. We may have multiple
// activity descriptions that have the same source node.
- producer_channels.insert(active_node.0, sender);
+ producer_channels.insert(node_id, sender);
}
- for description in self.activity {
- let sender_chan = producer_channels.get(&description.source).unwrap();
- set.spawn(produce_events(description, sender_chan.clone()));
+ for activity in self.activity.iter() {
+ let sender_chan = producer_channels.get(&activity.source).unwrap();
+ set.spawn(produce_events(*activity, sender_chan.clone()));
}
// Wait for threads to exit.
@@ -151,7 +142,10 @@ impl Simulation {
}
}
-async fn consume_events(node: Arc<Mutex<dyn LightningNode>>, mut receiver: mpsc::Receiver<Event>) {
+async fn consume_events(
+ node: Arc<Mutex<dyn LightningNode + Send>>,
+ mut receiver: mpsc::Receiver<Event>,
+) {
//while let Some(event) = receiver.recv().await {}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to make add Send
to your trait constraints for nodes (i.e. Arc<Mutex<dyn LightningNode + Send>>>)
so the task can be spawned.
Or for filtering we can just collect the sources in a
|
c66e15a
to
082b755
Compare
We're going to need to pass these clients off to another thread, so we add the syn primitives we'll need in a separate commit.
Since we specifically deliver events to the executing node at the time that they need to run, we don't need the source/offset wrapper anymore.
586b62f
to
ea9ee8c
Compare
sim-lib/src/lib.rs
Outdated
let mut join_errs = Vec::new(); | ||
while let Some(res) = set.join_next().await { | ||
if let Err(e) = res { | ||
join_errs.push(e) | ||
} | ||
} | ||
|
||
if let Some(_) = join_errs.first() { | ||
return Err(anyhow!("Err")); | ||
} | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make this a bit more idiomatic:
let mut join_errs = Vec::new(); | |
while let Some(res) = set.join_next().await { | |
if let Err(e) = res { | |
join_errs.push(e) | |
} | |
} | |
if let Some(_) = join_errs.first() { | |
return Err(anyhow!("Err")); | |
} | |
Ok(()) | |
let mut results = vec![]; | |
while let Some(res) = set.join_next().await { | |
results.push(res); | |
} | |
(!(results.iter().any(|x| matches!(x, Err(..))))) | |
.then(|| ()) | |
.ok_or(anyhow!("One of our tasks failed")) |
c35aed9
to
47f9cd7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
sim-lib/src/lib.rs
Outdated
println!("Received shutting down signal. Shutting down"); | ||
break; | ||
} | ||
let _ = sender.send(e).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be handling errors / result of sending action to consumer?
47f9cd7
to
2fc0f57
Compare
Creates a produce_events method that the Simulator can use to parse ActivityDefinition into Events and send them to the right executors every interval.
2fc0f57
to
3edb420
Compare
Add Event Consumer and Producer
This PR adds event generation and execution for our simulation. This is done using async channels and a consumer/producer pattern:
node
that we have execution on, we start aconsumer
thread that will receive events and execute them on the underlying lightning node.activity_description
that we have, we push out an event to the appropriate consumer at the interval that's requested.