Skip to content

Commit

Permalink
fix(CompositeDevice): use async target attach request to prevent dead…
Browse files Browse the repository at this point in the history
…lock
  • Loading branch information
ShadowApex committed Feb 14, 2025
1 parent 31fd563 commit a748938
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 63 deletions.
97 changes: 40 additions & 57 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,49 +273,14 @@ impl CompositeDevice {

/// Starts the [CompositeDevice] and listens for events from all source
/// devices to translate the events and send them to the appropriate target.
pub async fn run(
&mut self,
targets: HashMap<String, TargetDeviceClient>,
) -> Result<(), Box<dyn Error>> {
pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
log::debug!("Starting composite device");

let dbus_path = self.dbus_path.clone();

// Start all source devices
self.run_source_devices().await?;

// Keep track of all target devices
for (path, target) in targets.iter() {
if let Err(e) = target.set_composite_device(self.client()).await {
return Err(
format!("Failed to set composite device for target device: {:?}", e).into(),
);
}

// Query the target device for its capabilities
let caps = match target.get_capabilities().await {
Ok(caps) => caps,
Err(e) => {
return Err(format!("Failed to get target capabilities: {e:?}").into());
}
};

// Track the target device by capabilities it has
for cap in caps {
self.target_devices_by_capability
.entry(cap)
.and_modify(|devices| {
devices.insert(path.clone());
})
.or_insert_with(|| {
let mut devices = HashSet::new();
devices.insert(path.clone());
devices
});
}
}
self.target_devices = targets;

// Loop and listen for command events
log::debug!("CompositeDevice started");
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
Expand Down Expand Up @@ -612,6 +577,7 @@ impl CompositeDevice {
if let Err(e) = hide_device(source_path.as_str()).await {
log::warn!("Failed to hide device '{source_path}': {e:?}");
}
log::debug!("Finished hiding device: {source_path}");
}

log::debug!("Starting new source devices");
Expand Down Expand Up @@ -661,7 +627,7 @@ impl CompositeDevice {
log::trace!("Blocking event! {:?}", raw_event);
return Ok(());
}
//log::trace!("Received event: {:?} from {device_id}", raw_event);
log::trace!("Received event: {:?} from {device_id}", raw_event);

// Convert the event into a NativeEvent
let event: NativeEvent = match raw_event {
Expand Down Expand Up @@ -1860,6 +1826,7 @@ impl CompositeDevice {

// Create new target devices using the input manager
for kind in device_types_to_start {
// Ask the input manager to create a target device
log::debug!("Requesting to create device: {kind}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
Expand All @@ -1878,29 +1845,44 @@ impl CompositeDevice {
}
};

// Attach the target device
// Ask the input manager to attach the target device to this composite
// device. Note that this *must* be run in an async task to prevent
// deadlocking.
log::debug!("Requesting to attach target device {target_path} to {composite_path}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path.clone(),
composite_path: composite_path.clone(),
sender,
})
.await?;
let Some(response) = receiver.recv().await else {
log::warn!("Channel closed waiting for response from input manager");
continue;
};
if let Err(e) = response {
log::error!("Failed to attach target device: {e:?}");
}
let manager = self.manager.clone();
let target_path_clone = target_path.clone();
let composite_path_clone = composite_path.clone();
tokio::task::spawn(async move {
let (sender, mut receiver) = mpsc::channel(1);
let result = manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path_clone,
composite_path: composite_path_clone,
sender,
})
.await;
if let Err(e) = result {
log::warn!(
"Failed to send attach request to input manager: {}",
e.to_string()
);
return;
}
let Some(response) = receiver.recv().await else {
log::warn!("Channel closed waiting for response from input manager");
return;
};
if let Err(e) = response {
log::error!("Failed to attach target device: {e:?}");
}
});

// Enqueue the target device to wait for the attachment message from
// the input manager to prevent multiple calls to set_target_devices()
// from mangling attachment.
self.target_devices_queued.insert(target_path);
}

// Signal change in target devices to DBus
// TODO: Check this
//self.signal_targets_changed().await;
Expand Down Expand Up @@ -1983,10 +1965,6 @@ impl CompositeDevice {
}
log::debug!("Attached device {path} to {dbus_path}");

// Add the target device
self.target_devices_queued.remove(&path);
self.target_devices.insert(path.clone(), target);

// Track the target device by capabilities it has
for cap in caps {
self.target_devices_by_capability
Expand All @@ -2000,7 +1978,12 @@ impl CompositeDevice {
devices
});
}

// Add the target device
self.target_devices_queued.remove(&path);
self.target_devices.insert(path.clone(), target);
}

// TODO: check this
//self.signal_targets_changed().await;

Expand Down
7 changes: 4 additions & 3 deletions src/input/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ impl Manager {
pub async fn run(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
// Delay initial discovery by a short amount of time to allow udev
// rules to process for the first time.
tokio::time::sleep(Duration::from_millis(1000)).await;
// TODO: Figure out a better way to prevent udev from not running hiding
// rules too early in boot.
tokio::time::sleep(Duration::from_millis(4000)).await;

let dbus_for_listen_on_dbus = self.dbus.clone();

Expand Down Expand Up @@ -793,8 +795,7 @@ impl Manager {
let composite_path_clone = composite_path.clone();
let tx = self.tx.clone();
let task = tokio::spawn(async move {
let targets = HashMap::new();
if let Err(e) = device.run(targets).await {
if let Err(e) = device.run().await {
log::error!("Error running {composite_path}: {}", e.to_string());
}
log::debug!("Composite device stopped running: {composite_path}");
Expand Down
6 changes: 4 additions & 2 deletions src/udev/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,9 @@ impl Device {
let match_rule = match subsystem.as_str() {
"hidraw" => {
let name = self.name.clone();
Some(format!(r#"SUBSYSTEMS=="{subsystem}", KERNEL=="{name}""#))
Some(format!(
r#"ACTION=="add|change", SUBSYSTEMS=="{subsystem}", KERNEL=="{name}""#
))
}
"input" => {
let rule_fn = || {
Expand All @@ -755,7 +757,7 @@ impl Device {
let pid = self.get_product_id()?;

Some(format!(
r#"SUBSYSTEMS=="{subsystem}", KERNELS=="{device_name}", ATTRS{{id/vendor}}=="{vid}", ATTRS{{id/product}}=="{pid}""#
r#"ACTION=="add|change", SUBSYSTEMS=="{subsystem}", KERNELS=="{device_name}", ATTRS{{id/vendor}}=="{vid}", ATTRS{{id/product}}=="{pid}""#
))
};
rule_fn()
Expand Down
3 changes: 2 additions & 1 deletion src/udev/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ pub async fn hide_device(path: &str) -> Result<(), Box<dyn Error>> {
{match_rule}, GOTO="inputplumber_valid"
GOTO="inputplumber_end"
LABEL="inputplumber_valid"
KERNEL=="hidraw[0-9]*|js[0-9]*|event[0-9]*", SUBSYSTEM=="{subsystem}", MODE="000", GROUP="root", RUN:="{chmod_cmd} 000 {path}", SYMLINK+="inputplumber/by-hidden/%k"
KERNEL=="js[0-9]*|event[0-9]*", SUBSYSTEM=="{subsystem}", MODE:="0000", GROUP:="root", RUN:="{chmod_cmd} 000 /dev/input/%k", SYMLINK+="inputplumber/by-hidden/%k"
KERNEL=="hidraw[0-9]*", SUBSYSTEM=="{subsystem}", MODE:="0000", GROUP:="root", RUN:="{chmod_cmd} 000 /dev/%k", SYMLINK+="inputplumber/by-hidden/%k"
LABEL="inputplumber_end"
"#
);
Expand Down

0 comments on commit a748938

Please sign in to comment.