diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 6a48052b..4b88d2cd 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -155,7 +155,7 @@ pub struct QuerySubscription { pub struct ChangeSubscription { entries: HashMap>, - sender: Option>, + sender: mpsc::Sender, permissions: Permissions, } @@ -661,23 +661,18 @@ impl Subscriptions { } }); self.change_subscriptions.retain_mut(|sub| { - if let Some(sender) = &sub.sender { - if sender.is_closed() { - info!("Subscriber gone: removing subscription"); - false - } else { - match &sub.permissions.expired() { - Ok(()) => true, - Err(PermissionError::Expired) => { - info!("Token expired: removing subscription"); - false - } - Err(err) => panic!("Error: {:?}", err), - } - } - } else { + if sub.sender.is_closed() { info!("Subscriber gone: removing subscription"); false + } else { + match &sub.permissions.expired() { + Ok(()) => true, + Err(PermissionError::Expired) => { + info!("Token expired: removing subscription"); + false + } + Err(err) => panic!("Error: {:?}", err), + } } }); } @@ -749,13 +744,11 @@ impl ChangeSubscription { }; if notifications.updates.is_empty() { Ok(()) - } else if let Some(sender) = &self.sender { - match sender.send(notifications).await { + } else { + match &self.sender.send(notifications).await { Ok(()) => Ok(()), Err(_) => Err(NotificationError {}), } - } else { - Err(NotificationError {}) } } else { Ok(()) @@ -792,13 +785,9 @@ impl ChangeSubscription { } notifications }; - if let Some(sender) = &self.sender { - match sender.send(notifications).await { - Ok(()) => Ok(()), - Err(_) => Err(NotificationError {}), - } - } else { - Err(NotificationError {}) + match &self.sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), } } } @@ -1478,7 +1467,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let (sender, receiver) = mpsc::channel(10); let subscription = ChangeSubscription { entries: valid_entries, - sender: Some(sender), + sender, permissions: self.permissions.clone(), };