Skip to content

Commit 69f0ead

Browse files
committed
enabled queue pumping on backend
1 parent b9f677a commit 69f0ead

File tree

3 files changed

+67
-29
lines changed

3 files changed

+67
-29
lines changed

frontend_api/src/lib.rs

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use axum::{routing::get, Router};
22
use futures_channel::mpsc::{unbounded, UnboundedSender};
3+
use futures_util::stream::SelectNextSome;
34
use futures_util::{SinkExt, StreamExt};
45
use maud::{html, Markup};
56
use messages::DisplayMessage;
@@ -26,7 +27,6 @@ type EventQueues = Arc<Mutex<Queues>>;
2627

2728
static EVENT_QUEUE_ACTIVE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true);
2829
static TTS_QUEUE_ACTIVE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
29-
static IS_DISPLAYING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
3030

3131
pub struct Queues {
3232
pub events: VecDeque<DisplayMessage>,
@@ -77,21 +77,67 @@ impl FrontendApi {
7777
tokio::spawn(async move {
7878
loop {
7979
let msg = (&mut receiver).recv().await;
80-
handle_message(state.clone(), queue.clone(), msg);
80+
handle_message(state.clone(), queue.clone(), msg).await;
8181
}
8282
});
8383

8484
// Process the Queues on a new thread
8585

86-
//tokio::spawn(async move {
87-
// loop {
88-
// let mut queues = message_queue_arc.lock().unwrap();
89-
// if !queues.events.is_empty() {
90-
// let message = queues.events.pop_front();
91-
// handle_message(connection_state.clone(), message_queue_arc.clone(), message);
92-
// }
93-
// }
94-
//});
86+
let queue_connection_state = connection_state.clone();
87+
tokio::spawn(async move {
88+
loop {
89+
let message = {
90+
let mut queues = message_queue_arc.lock().unwrap();
91+
queues.events.pop_front()
92+
};
93+
94+
let Some(message) = message else {
95+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
96+
continue;
97+
};
98+
//Make html message to send to frontend
99+
//<div id="alerts" hx-swap-oob="true">
100+
let html_message = html! {
101+
div id="notifications" hx-swap="afterend" hx-target="notifications" {
102+
h1 { (message.message) }
103+
img src=(message.image_url) {}
104+
}
105+
};
106+
{
107+
let mut websocket_state = queue_connection_state.lock().unwrap();
108+
for (&addr, tx) in websocket_state.iter_mut() {
109+
if tx
110+
.unbounded_send(Message::Text(html_message.clone().into()))
111+
.is_err()
112+
{
113+
println!("closing websocket message to: {} ==========", addr);
114+
}
115+
}
116+
}
117+
118+
//Pause for a bit to allow the message to be displayed
119+
tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
120+
121+
let html_message = html! {
122+
div id="notifications" hx-swap="delete" hx-target="notifications" {
123+
}
124+
};
125+
{
126+
let mut websocket_state = queue_connection_state.lock().unwrap();
127+
for (&addr, tx) in websocket_state.iter_mut() {
128+
if tx
129+
.unbounded_send(Message::Text(html_message.clone().into()))
130+
.is_err()
131+
{
132+
println!("closing websocket message to: {} ==========", addr);
133+
}
134+
}
135+
}
136+
137+
//Pause a bit before running queue again
138+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
139+
}
140+
});
95141

96142
let https_address = self.http_address.clone();
97143
tokio::spawn(async move {
@@ -144,7 +190,7 @@ async fn admin() -> AdminTemplate {
144190
AdminTemplate {}
145191
}
146192

147-
fn handle_message(
193+
async fn handle_message(
148194
connection_state: ConnectionMap,
149195
event_queues: EventQueues,
150196
message: Option<DisplayMessage>,
@@ -162,22 +208,6 @@ fn handle_message(
162208

163209
queues.events.push_back(message.clone());
164210
}
165-
166-
//Make html message to send to frontend
167-
//<div id="alerts" hx-swap-oob="true">
168-
let trigger = format!("delay:{}ms", message.display_time);
169-
let html_message = html! {
170-
div id="notifications" hx-swap="afterend" hx-target="notifications" ws-send="done" hx-trigger=(trigger) {
171-
h1 { (message.message) }
172-
img src=(message.image_url) {}
173-
}
174-
};
175-
if tx
176-
.unbounded_send(Message::Text(html_message.clone().into()))
177-
.is_err()
178-
{
179-
println!("closing websocket message to: {} ==========", addr);
180-
}
181211
}
182212
}
183213
None => panic!("Error receiving message"),

frontend_api/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ async fn main() {
2121
//TODO: write some test something to send a message to the receiver
2222

2323
loop {
24+
println!("Sending message");
2425
let display_message = messages::DisplayMessage {
2526
message: "hello from htmx".to_string(),
2627
image_url: "".to_string(),
@@ -34,6 +35,6 @@ async fn main() {
3435

3536
let _ = tx.send(display_message).unwrap();
3637

37-
tokio::time::sleep(tokio::time::Duration::from_secs(20)).await;
38+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
3839
}
3940
}

frontend_api/templates/index.html

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,11 @@
1919
</main>
2020
</body>
2121

22+
<script>
23+
htmx.on("htmx:wsBeforeMessage", function (event) {
24+
console.log("wsBeforeMessage", event);
25+
<!-- do our own thing -->
26+
});
27+
</script>
28+
2229
</html>

0 commit comments

Comments
 (0)