diff --git a/src-tauri/src/api.rs b/src-tauri/src/api.rs index bd3e2e4d..f76a771c 100644 --- a/src-tauri/src/api.rs +++ b/src-tauri/src/api.rs @@ -139,18 +139,23 @@ pub async fn join( "id": id_clone }); - seventv.subscribe("cosmetic.create", &channel_cond).await; - seventv.subscribe("entitlement.create", &channel_cond).await; + seventv + .subscribe(&login_clone, "cosmetic.create", &channel_cond) + .await; + + seventv + .subscribe(&login_clone, "entitlement.create", &channel_cond) + .await; if let Some(ref set_id) = set_id { seventv - .subscribe("emote_set.*", &json!({ "object_id": set_id })) + .subscribe(&login_clone, "emote_set.*", &json!({ "object_id": set_id })) .await; } if let Some(ref stv_id) = stv_id { seventv - .subscribe("user.update", &json!({ "object_id": stv_id })) + .subscribe(&login_clone, "user.update", &json!({ "object_id": stv_id })) .await; } } @@ -174,7 +179,7 @@ pub async fn leave(state: State<'_, Mutex>, channel: String) -> Result } if let Some(ref seventv) = state.seventv { - seventv.unsubscribe_all().await; + seventv.unsubscribe_all(&channel).await; } if let Some(ref irc) = state.irc { diff --git a/src-tauri/src/seventv/client.rs b/src-tauri/src/seventv/client.rs index 8a9b07fc..3fe01693 100644 --- a/src-tauri/src/seventv/client.rs +++ b/src-tauri/src/seventv/client.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use anyhow::anyhow; +use futures::future::join_all; use futures::{SinkExt, StreamExt}; use serde::Deserialize; use serde_json::json; @@ -111,7 +112,7 @@ impl SeventTvClient { } #[tracing::instrument(name = "7tv_subscribe", skip(self, condition), fields(%condition))] - pub async fn subscribe(&self, event: &str, condition: &serde_json::Value) { + pub async fn subscribe(&self, channel: &str, event: &str, condition: &serde_json::Value) { let payload = json!({ "op": 35, "d": { @@ -126,7 +127,7 @@ impl SeventTvClient { { Ok(_) => { let mut subscriptions = self.subscriptions.lock().await; - subscriptions.insert(event.to_string(), condition.clone()); + subscriptions.insert(format!("{channel}:{event}"), condition.clone()); tracing::trace!("Subscription created"); } @@ -136,10 +137,10 @@ impl SeventTvClient { } } - pub async fn unsubscribe(&self, event: &str) { + pub async fn unsubscribe(&self, channel: &str, event: &str) { let mut subscriptions = self.subscriptions.lock().await; - if let Some(condition) = subscriptions.remove(event) { + if let Some(condition) = subscriptions.remove(&format!("{channel}:{event}")) { let payload = json!({ "op": 36, "d": { @@ -154,21 +155,21 @@ impl SeventTvClient { } } - pub async fn unsubscribe_all(&self) { - let mut subscriptions = self.subscriptions.lock().await; + pub async fn unsubscribe_all(&self, channel: &str) { + let prefix = format!("{channel}:"); - for (event, condition) in subscriptions.drain() { - let payload = json!({ - "op": 36, - "d": { - "type": event, - "condition": condition - } - }); + let events = { + let subscriptions = self.subscriptions.lock().await; - let _ = self - .message_tx - .send(Message::Text(payload.to_string().into())); - } + subscriptions + .keys() + .filter(|k| k.starts_with(&prefix)) + .map(|k| k.strip_prefix(&prefix).unwrap().to_string()) + .collect::>() + }; + + let futures = events.iter().map(|event| self.unsubscribe(channel, event)); + + join_all(futures).await; } } diff --git a/src-tauri/src/seventv/mod.rs b/src-tauri/src/seventv/mod.rs index 6209aba8..888c0ea9 100644 --- a/src-tauri/src/seventv/mod.rs +++ b/src-tauri/src/seventv/mod.rs @@ -52,6 +52,7 @@ pub async fn connect_seventv( #[tauri::command] pub async fn resub_emote_set( state: State<'_, Mutex>, + channel: String, set_id: String, ) -> Result<(), Error> { let state = state.lock().await; @@ -60,9 +61,9 @@ pub async fn resub_emote_set( return Ok(()); }; - seventv.unsubscribe("emote_set.*").await; + seventv.unsubscribe(&channel, "emote_set.*").await; seventv - .subscribe("emote_set.*", &json!({ "object_id": set_id })) + .subscribe(&channel, "emote_set.*", &json!({ "object_id": set_id })) .await; Ok(()) diff --git a/src/lib/handlers/seventv/user-update.ts b/src/lib/handlers/seventv/user-update.ts index a7db423d..5f2c7504 100644 --- a/src/lib/handlers/seventv/user-update.ts +++ b/src/lib/handlers/seventv/user-update.ts @@ -40,7 +40,10 @@ export default defineHandler({ }; await channel.emotes.fetch7tv(); - await invoke("resub_emote_set", { setId: channel.emoteSetId }); + await invoke("resub_emote_set", { + channel: channel.user.username, + setId: channel.emoteSetId, + }); } channel.chat.addMessage(message);