Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Event Consumer and Producer #14

Merged
merged 6 commits into from
Aug 16, 2023

Conversation

carlaKC
Copy link
Contributor

@carlaKC carlaKC commented Aug 15, 2023

This PR adds event generation and execution for our simulation. This is done using async channels and a consumer/producer pattern:

  • For each node that we have execution on, we start a consumer thread that will receive events and execute them on the underlying lightning node.
  • For each activity_description that we have, we push out an event to the appropriate consumer at the interval that's requested.

Copy link
Member

@sr-gi sr-gi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a quick fix.

I would get rid of the filtering here. I think the data should have already been sanitized at this point, so we can return a meaningful error to the user.

diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs
index a54021d..38b443d 100644
--- a/sim-cli/src/main.rs
+++ b/sim-cli/src/main.rs
@@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
     let config_str = std::fs::read_to_string(cli.config)?;
     let Config { nodes, activity } = serde_json::from_str(&config_str)?;

-    let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
+    let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>> = HashMap::new();

     for node in nodes {
         let lnd = LndNode::new(node.address, node.macaroon, node.cert).await?;
diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs
index cd0761c..0402c8d 100644
--- a/sim-lib/src/lib.rs
+++ b/sim-lib/src/lib.rs
@@ -2,7 +2,6 @@ use async_trait::async_trait;
 use bitcoin::secp256k1::PublicKey;
 use lightning::ln::PaymentHash;
 use serde::{Deserialize, Serialize};
-use std::marker::Send;
 use std::{
     collections::HashMap,
     sync::{Arc, Mutex},
@@ -29,7 +28,7 @@ pub struct Config {
     pub activity: Vec<ActivityDefinition>,
 }

-#[derive(Debug, Clone, Serialize, Deserialize)]
+#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
 pub struct ActivityDefinition {
     // The source of the action.
     pub source: PublicKey,
@@ -85,7 +84,7 @@ struct PaymentResult {

 pub struct Simulation {
     // The lightning node that is being simulated.
-    nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
+    nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,

     // The activity that are to be executed on the node.
     activity: Vec<ActivityDefinition>,
@@ -93,7 +92,7 @@ pub struct Simulation {

 impl Simulation {
     pub fn new(
-        nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
+        nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
         activity: Vec<ActivityDefinition>,
     ) -> Self {
         Self { nodes, activity }
@@ -115,30 +114,22 @@ impl Simulation {
         let mut producer_channels = HashMap::new();
         let mut set = tokio::task::JoinSet::new();

-        for active_node in self.nodes.iter().filter(|k| {
-            for node in self.activity.iter() {
-                if node.source == *k.0 {
-                    return true;
-                }
-            }
-
-            false
-        }) {
+        for (node_id, node) in self.nodes.iter() {
             // For each active node, we'll create a sender and receiver channel to produce and consumer
             // events. We do not buffer channels as we expect events to clear quickly.
             let (sender, receiver) = mpsc::channel(0);

             // Generate a consumer for the receiving end of the channel.
-            set.spawn(consume_events(active_node.1.clone(), receiver));
+            set.spawn(consume_events(node.clone(), receiver));

             // Add the producer channel to our map so that various activity descriptions can use it. We may have multiple
             // activity descriptions that have the same source node.
-            producer_channels.insert(active_node.0, sender);
+            producer_channels.insert(node_id, sender);
         }

-        for description in self.activity {
-            let sender_chan = producer_channels.get(&description.source).unwrap();
-            set.spawn(produce_events(description, sender_chan.clone()));
+        for activity in self.activity.iter() {
+            let sender_chan = producer_channels.get(&activity.source).unwrap();
+            set.spawn(produce_events(*activity, sender_chan.clone()));
         }

         // Wait for threads to exit.
@@ -151,7 +142,10 @@ impl Simulation {
     }
 }

-async fn consume_events(node: Arc<Mutex<dyn LightningNode>>, mut receiver: mpsc::Receiver<Event>) {
+async fn consume_events(
+    node: Arc<Mutex<dyn LightningNode + Send>>,
+    mut receiver: mpsc::Receiver<Event>,
+) {
     //while let Some(event) = receiver.recv().await {}
 }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to make add Send to your trait constraints for nodes (i.e. Arc<Mutex<dyn LightningNode + Send>>>) so the task can be spawned.

@sr-gi
Copy link
Member

sr-gi commented Aug 16, 2023

Or for filtering we can just collect the sources in a HashSet and check if the node has activity:

for (id, node) in self.nodes.iter().filter(|(pk, _)| {
            self.activity
                .iter()
                .map(|a| a.source)
                .collect::<HashSet<PublicKey>>()
                .contains(pk)
        }) 

@carlaKC carlaKC force-pushed the consumer-events branch 7 times, most recently from c66e15a to 082b755 Compare August 16, 2023 11:57
We're going to need to pass these clients off to another thread, so
we add the syn primitives we'll need in a separate commit.
Since we specifically deliver events to the executing node at the time
that they need to run, we don't need the source/offset wrapper anymore.
@carlaKC carlaKC force-pushed the consumer-events branch 2 times, most recently from 586b62f to ea9ee8c Compare August 16, 2023 13:02
@carlaKC carlaKC marked this pull request as ready for review August 16, 2023 13:02
@carlaKC carlaKC changed the title WIP: Event consumer Add Event Consumer and Producer Aug 16, 2023
Comment on lines 171 to 182
let mut join_errs = Vec::new();
while let Some(res) = set.join_next().await {
if let Err(e) = res {
join_errs.push(e)
}
}

if let Some(_) = join_errs.first() {
return Err(anyhow!("Err"));
}

Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this a bit more idiomatic:

Suggested change
let mut join_errs = Vec::new();
while let Some(res) = set.join_next().await {
if let Err(e) = res {
join_errs.push(e)
}
}
if let Some(_) = join_errs.first() {
return Err(anyhow!("Err"));
}
Ok(())
let mut results = vec![];
while let Some(res) = set.join_next().await {
results.push(res);
}
(!(results.iter().any(|x| matches!(x, Err(..)))))
.then(|| ())
.ok_or(anyhow!("One of our tasks failed"))

@carlaKC carlaKC force-pushed the consumer-events branch 2 times, most recently from c35aed9 to 47f9cd7 Compare August 16, 2023 13:12
Copy link
Member

@sr-gi sr-gi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

println!("Received shutting down signal. Shutting down");
break;
}
let _ = sender.send(e).await;
Copy link
Collaborator

@okjodom okjodom Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be handling errors / result of sending action to consumer?

Creates a produce_events method that the Simulator can use to parse
ActivityDefinition into Events and send them to the right executors
every interval.
@carlaKC carlaKC merged commit 3d2be24 into bitcoin-dev-project:main Aug 16, 2023
1 check passed
okjodom pushed a commit to okjodom/sim-ln that referenced this pull request Nov 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants