Skip to content

Commit

Permalink
Watch for updates
Browse files Browse the repository at this point in the history
  • Loading branch information
timmo001 committed Jun 8, 2024
1 parent f331f9d commit e07f104
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 37 deletions.
72 changes: 52 additions & 20 deletions src-tauri/src/modules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ mod sensors;
mod system;

use crate::shared::get_data_path;
use log::{error, info};
use notify::{recommended_watcher, RecursiveMode, Watcher};
use log::{debug, error, info};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
use std::str::FromStr;
use std::{fmt, path::Path};

#[derive(Debug, Serialize, Deserialize)]
pub enum Module {
Expand Down Expand Up @@ -203,30 +203,62 @@ pub async fn update_modules(modules: &Vec<Module>) -> Result<(), String> {
Ok(())
}

pub fn watch_modules(
pub async fn watch_modules(
modules: &Vec<String>,
callback_fn: fn(&Module, &Value),
) -> Result<(), String> {
info!("Watching modules: {:?}", modules);
let path = get_modules_path();
info!("Watching modules in '{:?}': {:?}", path, modules);

// Watch for changes in module data files
let watcher_result = recommended_watcher(|res| match res {
Ok(event) => {
info!("event: {:?}", event);
}
Err(e) => {
error!("watch error: {:?}", e);
}
});
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default()).unwrap();
watcher
.watch(path.as_ref(), RecursiveMode::NonRecursive)
.unwrap();

if watcher_result.is_err() {
return Err("Failed to create watcher".to_string());
}
for res in rx {
match res {
Ok(event) => {
debug!("Watcher event: {:?}", event);

let mut watcher = watcher_result.unwrap();
for path in &event.paths {
for module in modules {
// Check if path ends with module filename
if path.ends_with(&format!("{}.json", module)) {
match Module::from_str(module) {
Ok(module) => {
debug!("Module: {:?}", module);

match watcher.watch(Path::new(&get_modules_path()), RecursiveMode::NonRecursive) {
Ok(_) => Ok(()),
Err(e) => Err(format!("Failed to watch modules: {:?}", e)),
// Get module data
let data = match get_module_data(&module).await {
Ok(data) => data,
Err(e) => {
error!(
"Failed to get updated '{:?}' module data: {:?}",
module, e
);
continue;
}
};

// Call callback function
callback_fn(&module, &data);
}
Err(e) => {
error!("Invalid module: {:?}", e);
continue;
}
}
}
}
}
}
Err(e) => {
println!("watch error: {:?}", e);
}
}
}

Ok(())
}
46 changes: 29 additions & 17 deletions src-tauri/src/websocket/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ use log::{debug, error, info, warn};
use rocket::get;
use rocket_ws::{Message, Stream, WebSocket};
use serde_json::Value;
use std::str::FromStr;
use std::sync::Mutex;
use std::{str::FromStr, thread};
use tokio::runtime::Runtime;

#[get("/api/websocket")]
pub async fn websocket(ws: WebSocket) -> Stream!['static] {
// Create multiple threads to handle the different tasks
let mut tasks: Vec<thread::JoinHandle<()>> = vec![];

Stream! { ws =>
for await msg in ws {
// Get the message
Expand Down Expand Up @@ -165,24 +168,33 @@ pub async fn websocket(ws: WebSocket) -> Stream!['static] {
}

let request_data = request_data_result.unwrap();
// let modules = request_data.modules;
info!("Register data listener for modules: {:?}", request_data.modules);

// Listen for data updates for the requested modules on another thread
tokio::spawn(async move {
watch_modules(&request_data.modules, |module, data| {
// Send data update to the client
info!("Data update for module: {:?} - {:?}", module, data);

// yield Message::text(serde_json::to_string(&WebsocketResponse {
// id: request_id.clone(),
// type_: EventType::DataUpdate.to_string(),
// data: data.clone(),
// subtype: None,
// message: None,
// module: Some(module.to_string()),
// }).unwrap());
}).unwrap();
});
tasks.push(
thread::Builder::new()
.name(format!("listener_{}", request_id).into())
.spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
watch_modules(&request_data.modules, |module, data| {
// Send data update to the client
info!("Data update for module: {:?} - {:?}", module, data);

// yield Message::text(serde_json::to_string(&WebsocketResponse {
// id: request_id.clone(),
// type_: EventType::DataUpdate.to_string(),
// data: data.clone(),
// subtype: None,
// message: None,
// module: Some(module.to_string()),
// }).unwrap());
}).await.unwrap();
});
})
.unwrap(),
);

yield Message::text(serde_json::to_string(&WebsocketResponse {
id: request_id.clone(),
Expand Down

0 comments on commit e07f104

Please sign in to comment.