Skip to content

Commit

Permalink
Improve thread panic handling.
Browse files Browse the repository at this point in the history
Use a drop impl alongside panic unwinding to detect when a managed
thread exits.

Refactor the program manager into a standalone function for legibility.
  • Loading branch information
arusahni committed Dec 26, 2023
1 parent ceb5bc9 commit ccdcc30
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 85 deletions.
213 changes: 129 additions & 84 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ use std::{
fs::{create_dir_all, remove_file},
io::{Read, Write},
os::unix::net::{UnixListener, UnixStream},
process,
sync::{mpsc, Arc, RwLock},
panic, process,
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc, RwLock,
},
thread,
time::Duration,
};

use clap::Parser;
use directories::ProjectDirs;
use tracing::{debug, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

use errors::Error;
use tracing_subscriber::{
Expand All @@ -31,6 +34,17 @@ enum StreamState {
Exists(UnixStream),
}

struct Sentinel {
flag: Arc<AtomicBool>,
}

impl Drop for Sentinel {
fn drop(&mut self) {
self.flag.store(true, Ordering::SeqCst);
warn!("A thread has terminated or panicked");
}
}

/// Get (and possibly create) a socket for the given instance.
fn get_socket(instance_name: &str) -> Result<StreamState, Error> {
let dir = match ProjectDirs::from("net", "arusahni", "qurop")
Expand Down Expand Up @@ -125,102 +139,133 @@ pub(crate) struct Context {
pub window_id: Option<u32>,
}

fn run(listener: UnixListener, instance: Instance) {
let (tx, rx) = mpsc::channel::<String>();
let ctx = Arc::new(RwLock::new(Context {
matcher: instance.matcher.clone(),
window_id: None,
}));
let program_ctx = Arc::clone(&ctx);
let program_manager = thread::spawn(move || {
let mut program = process::Command::new("sh")
.arg("-c")
.arg(instance.command.clone())
.spawn()
.expect("failed to start");
info!("[{}] Started PID: {}", instance.name, program.id());
{
let ctx = &mut program_ctx.write().unwrap();
if matches!(ctx.matcher, x11::WindowMatcher::ProcessId(_)) {
ctx.matcher = x11::WindowMatcher::ProcessId(Some(program.id()));
trace!("[{}] Set a new PID {}", instance.name, program.id());
}
ctx.window_id = Some(block_for_window(&ctx.matcher));
trace!(
"[{}] Set a new Window ID {:?}",
instance.name,
ctx.window_id
);
pub(crate) fn program_thread(
rx: mpsc::Receiver<String>,
instance: Instance,
ctx: &Arc<RwLock<Context>>,
) {
let mut program = process::Command::new("sh")
.arg("-c")
.arg(instance.command.clone())
.spawn()
.expect("failed to start");
info!("[{}] Started PID: {}", instance.name, program.id());
{
let write_ctx = &mut ctx.write().unwrap();
if matches!(write_ctx.matcher, x11::WindowMatcher::ProcessId(_)) {
write_ctx.matcher = x11::WindowMatcher::ProcessId(Some(program.id()));
trace!("[{}] Set a new PID {}", instance.name, program.id());
}
loop {
if let Ok(msg) = rx.recv() {
let action = if msg == "toggle" {
let win_id = program_ctx.read().unwrap().window_id;
if win_id.is_some() && x11::window_is_active(win_id.unwrap()) {
"hide".into()
} else {
"open".into()
}
write_ctx.window_id = Some(block_for_window(&write_ctx.matcher));
trace!(
"[{}] Set a new Window ID {:?}",
instance.name,
write_ctx.window_id
);
}
loop {
if let Ok(msg) = rx.recv() {
let action = if msg == "toggle" {
let win_id = ctx.read().unwrap().window_id;
if win_id.is_some() && x11::window_is_active(win_id.unwrap()) {
"hide".into()
} else {
msg.clone()
};
debug!("[{}] Taking action: '{}'", instance.name, action);
match action.as_str() {
"open" => {
let ctx = program_ctx.clone();
if let Ok(Some(status)) = program.try_wait() {
info!(
"[{}] Program has exited ({}). Restarting.",
instance.name, status
);
let mut ctx_writer = ctx.write().unwrap();
program = process::Command::new("sh")
.arg("-c")
.arg(instance.command.clone())
.spawn()
.expect("failed to start");
if matches!(ctx_writer.matcher, x11::WindowMatcher::ProcessId(_)) {
trace!("[{}] Setting new pid {}", instance.name, program.id());
ctx_writer.matcher =
x11::WindowMatcher::ProcessId(Some(program.id()));
}
ctx_writer.window_id = Some(block_for_window(&ctx_writer.matcher));
} else {
let window_id = ctx.read().unwrap().window_id.unwrap();
x11::map_window(window_id);
x11::position_window(window_id);
}
}
"kill" => {
info!("[{}] Killing", instance.name);
program.kill().unwrap();
break;
}
"hide" => {
let ctx = program_ctx.clone();
let lock = ctx.read().unwrap();
match lock.window_id {
Some(window_id) => x11::unmap_window(window_id),
None => x11::unmap_qurop_window(&lock.matcher),
"open".into()
}
} else {
msg.clone()
};
debug!("[{}] Taking action: '{}'", instance.name, action);
match action.as_str() {
"open" => {
let read_ctx = ctx.clone();
if let Ok(Some(status)) = program.try_wait() {
info!(
"[{}] Program has exited ({}). Restarting.",
instance.name, status
);
let mut write_ctx = read_ctx.write().unwrap();
program = process::Command::new("sh")
.arg("-c")
.arg(instance.command.clone())
.spawn()
.expect("failed to start");
if matches!(write_ctx.matcher, x11::WindowMatcher::ProcessId(_)) {
trace!("[{}] Setting new pid {}", instance.name, program.id());
write_ctx.matcher = x11::WindowMatcher::ProcessId(Some(program.id()));
}
write_ctx.window_id = Some(block_for_window(&write_ctx.matcher));
} else {
let window_id = read_ctx.read().unwrap().window_id.unwrap();
x11::map_window(window_id);
x11::position_window(window_id);
}
command if command.starts_with("hide:") => {
x11::unmap_window(command.split(':').last().unwrap().parse().unwrap())
}
"kill" => {
info!("[{}] Killing", instance.name);
program.kill().unwrap();
break;
}
"hide" => {
let local_ctx = ctx.clone();
let read_ctx = local_ctx.read().unwrap();
match read_ctx.window_id {
Some(window_id) => x11::unmap_window(window_id),
None => x11::unmap_qurop_window(&read_ctx.matcher),
}
_ => info!("[{}] Unknown: '{}' ({})", instance.name, msg, action),
}
command if command.starts_with("hide:") => {
x11::unmap_window(command.split(':').last().unwrap().parse().unwrap())
}
_ => info!("[{}] Unknown: '{}' ({})", instance.name, msg, action),
}
}
}
}

fn run(listener: UnixListener, instance: Instance) {
let flag = Arc::new(AtomicBool::new(false));
let (tx, rx) = mpsc::channel::<String>();
let ctx = Arc::new(RwLock::new(Context {
matcher: instance.matcher.clone(),
window_id: None,
}));
let program_ctx = Arc::clone(&ctx);
let program_flag = Arc::clone(&flag);
let _program_manager = thread::spawn(move || {
let _sentinel = Sentinel { flag: program_flag };
match panic::catch_unwind(|| {
program_thread(rx, instance, &program_ctx);
}) {
Ok(()) => info!("Program thread completed"),
Err(err) => error!(?err, "Program thread panicked"),
};
});
let socket_tx = tx.clone();
let socket_flag = Arc::clone(&flag);
let _socket_manager = thread::spawn(|| {
handle_socket_messages(listener, socket_tx).unwrap();
let _sentinel = Sentinel { flag: socket_flag };
match panic::catch_unwind(|| {
handle_socket_messages(listener, socket_tx).unwrap();
}) {
Ok(()) => info!("Socket thread completed"),
Err(err) => error!(?err, "Socket thread panicked"),
};
});
let window_ctx = Arc::clone(&ctx);
let wm_flag = Arc::clone(&flag);
let _window_server_manager = thread::spawn(move || {
x11::handle_window(tx, &window_ctx);
let _sentinel = Sentinel { flag: wm_flag };
match panic::catch_unwind(|| {
x11::handle_window(tx, &window_ctx);
}) {
Ok(()) => info!("WM thread completed"),
Err(err) => error!(?err, "WM thread panicked"),
};
});
program_manager.join().unwrap();
while !flag.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(500));
}
}

#[derive(Debug, Clone)]
Expand Down
6 changes: 5 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use colored::*;
use std::process;

pub(crate) fn abort(message: &str) -> ! {
eprintln!("{}", message.red());
print_error(message);
process::exit(1);
}

pub(crate) fn print_error(message: &str) {
eprintln!("{}", message.red());
}

0 comments on commit ccdcc30

Please sign in to comment.