-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use batch subscribe and remove 4050 #388
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,11 @@ | ||
use { | ||
crate::{ | ||
metrics::Metrics, | ||
spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL}, | ||
}, | ||
crate::metrics::Metrics, | ||
relay_client::{error::Error, http::Client}, | ||
relay_rpc::{ | ||
domain::Topic, | ||
rpc::{self, msg_id::get_message_id, Publish, PublishError, SubscriptionError}, | ||
}, | ||
std::{ | ||
sync::{Arc, OnceLock}, | ||
time::{Duration, Instant}, | ||
}, | ||
std::time::{Duration, Instant}, | ||
tokio::time::sleep, | ||
tracing::{error, info, instrument, warn}, | ||
}; | ||
|
@@ -33,7 +27,7 @@ pub async fn publish_relay_message( | |
info!("publish_relay_message"); | ||
let start = Instant::now(); | ||
|
||
let client_publish_call = || async { | ||
let call = || async { | ||
let start = Instant::now(); | ||
let result = relay_client | ||
.publish( | ||
|
@@ -63,7 +57,7 @@ pub async fn publish_relay_message( | |
}; | ||
|
||
let mut tries = 0; | ||
while let Err(e) = client_publish_call().await { | ||
while let Err(e) = call().await { | ||
tries += 1; | ||
let is_permanent = tries >= 10; | ||
if let Some(metrics) = metrics { | ||
|
@@ -104,7 +98,7 @@ pub async fn subscribe_relay_topic( | |
info!("subscribe_relay_topic"); | ||
let start = Instant::now(); | ||
|
||
let client_publish_call = || async { | ||
let call = || async { | ||
let start = Instant::now(); | ||
let result = relay_client.subscribe_blocking(topic.clone()).await; | ||
if let Some(metrics) = metrics { | ||
|
@@ -127,7 +121,7 @@ pub async fn subscribe_relay_topic( | |
}; | ||
|
||
let mut tries = 0; | ||
while let Err(e) = client_publish_call().await { | ||
while let Err(e) = call().await { | ||
tries += 1; | ||
let is_permanent = tries >= 10; | ||
if let Some(metrics) = metrics { | ||
|
@@ -162,25 +156,72 @@ pub async fn subscribe_relay_topic( | |
} | ||
|
||
#[instrument(skip(relay_client, metrics))] | ||
pub async fn extend_subscription_ttl( | ||
pub async fn batch_subscribe_relay_topics( | ||
relay_client: &Client, | ||
topic: Topic, | ||
topics: Vec<Topic>, | ||
metrics: Option<&Metrics>, | ||
) -> Result<(), Error<PublishError>> { | ||
info!("extend_subscription_ttl"); | ||
|
||
// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime | ||
static LOCK: OnceLock<Arc<str>> = OnceLock::new(); | ||
let message = LOCK.get_or_init(|| "".into()).clone(); | ||
|
||
let publish = Publish { | ||
topic, | ||
message, | ||
tag: NOTIFY_NOOP_TAG, | ||
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, | ||
prompt: false, | ||
) -> Result<(), Error<SubscriptionError>> { | ||
info!("batch_subscribe_relay_topic"); | ||
let start = Instant::now(); | ||
|
||
let call = || async { | ||
let start = Instant::now(); | ||
let result = relay_client.batch_subscribe_blocking(topics.clone()).await; | ||
if let Some(metrics) = metrics { | ||
metrics.relay_batch_subscribe_request(start); | ||
} | ||
// TODO process each error individually | ||
// TODO retry relay internal failures? | ||
// https://github.com/WalletConnect/notify-server/issues/395 | ||
match result { | ||
Ok(_) => Ok(()), | ||
Err(e) => match e { | ||
Error::Response(rpc::Error::Handler( | ||
SubscriptionError::SubscriberLimitExceeded, | ||
)) => { | ||
// FIXME figure out how to handle this properly; being unable to subscribe means a broken state | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider creating an issue to address this and linking it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This issue was root caused and no issue should currently exist because Notify Server always subscribes first. Confirmed with logs. Removing this error handling logic in 6e4cb5c as the error should not happen. |
||
// https://walletconnect.slack.com/archives/C058RS0MH38/p1708183383748259 | ||
warn!("Subscriber limit exceeded for topics {topics:?}"); | ||
Ok(()) | ||
} | ||
e => Err(e), | ||
}, | ||
} | ||
}; | ||
publish_relay_message(relay_client, &publish, metrics).await | ||
|
||
let mut tries = 0; | ||
while let Err(e) = call().await { | ||
tries += 1; | ||
let is_permanent = tries >= 10; | ||
if let Some(metrics) = metrics { | ||
metrics.relay_batch_subscribe_failure(is_permanent); | ||
} | ||
|
||
if is_permanent { | ||
error!("Permanent error batch subscribing to topics, took {tries} tries: {e:?}"); | ||
|
||
if let Some(metrics) = metrics { | ||
// TODO make DRY with end-of-function call | ||
metrics.relay_batch_subscribe(false, start); | ||
} | ||
return Err(e); | ||
} | ||
|
||
let retry_in = calculate_retry_in(tries); | ||
warn!( | ||
"Temporary error batch subscribing to topics, retrying attempt {tries} in {retry_in:?}: {e:?}" | ||
); | ||
sleep(retry_in).await; | ||
} | ||
|
||
if let Some(metrics) = metrics { | ||
metrics.relay_batch_subscribe(true, start); | ||
} | ||
|
||
// Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes | ||
sleep(Duration::from_millis(250)).await; | ||
|
||
Ok(()) | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heilhead