-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: don't block startup on subscribe #359
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ use { | |
}, | ||
futures_util::{StreamExt, TryFutureExt, TryStreamExt}, | ||
relay_client::http::Client, | ||
relay_rpc::{domain::Topic, rpc::MAX_SUBSCRIPTION_BATCH_SIZE}, | ||
relay_rpc::domain::Topic, | ||
sqlx::PgPool, | ||
std::{sync::Arc, time::Instant}, | ||
tokio::sync::Mutex, | ||
|
@@ -46,36 +46,13 @@ pub async fn run( | |
let topics_count = topics.len(); | ||
info!("topics_count: {topics_count}"); | ||
|
||
// Collect each batch into its own vec, since `batch_subscribe` would convert | ||
// them anyway. | ||
let topic_batches = topics | ||
.chunks(MAX_SUBSCRIPTION_BATCH_SIZE) | ||
.map(|chunk| chunk.to_vec()) | ||
.collect::<Vec<_>>(); | ||
|
||
// Limit concurrency to avoid overwhelming the relay with requests. | ||
const REQUEST_CONCURRENCY: usize = 200; | ||
|
||
futures_util::stream::iter(topic_batches) | ||
.map(|topic_batch| { | ||
// Map result to an unsized type to avoid allocation when collecting, | ||
// as we don't care about subscription IDs. | ||
client.batch_subscribe_blocking(topic_batch).map_ok(|_| ()) | ||
}) | ||
.buffer_unordered(REQUEST_CONCURRENCY) | ||
.try_collect::<Vec<_>>() | ||
.await?; | ||
|
||
let elapsed: u64 = start | ||
.elapsed() | ||
.as_millis() | ||
.try_into() | ||
.expect("No error getting ms of elapsed time"); | ||
info!("resubscribe took {elapsed}ms"); | ||
const REQUEST_CONCURRENCY: usize = 25; | ||
|
||
// If operation already running, don't start another one | ||
let mut operation_running = renew_all_topics_lock.lock().await; | ||
if !*operation_running { | ||
info!("Starting renew operation"); | ||
*operation_running = true; | ||
// Renew all subscription TTLs. | ||
// This can take a long time (e.g. 2 hours), so cannot block server startup. | ||
|
@@ -86,7 +63,9 @@ pub async fn run( | |
async move { | ||
let client = &client; | ||
let metrics = metrics.as_ref(); | ||
let start = Instant::now(); | ||
|
||
// Using `batch_subscription` was removed in https://github.com/WalletConnect/notify-server/pull/359 | ||
// We can't really use this right now because we are also extending the topic TTL which could take longer than the 5m TTL | ||
let result = futures_util::stream::iter(topics) | ||
.map(|topic| async move { | ||
// Subscribe a second time as the initial subscription above may have expired | ||
|
@@ -101,7 +80,7 @@ pub async fn run( | |
}) | ||
// Above we want to resubscribe as quickly as possible so use a high concurrency value | ||
// But here we prefer stability and are OK with a lower value | ||
.buffer_unordered(25) | ||
.buffer_unordered(REQUEST_CONCURRENCY) | ||
.try_collect::<Vec<_>>() | ||
.await; | ||
let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap(); | ||
|
@@ -114,19 +93,25 @@ pub async fn run( | |
info!("Success renewing all topic subscriptions in {elapsed}ms"); | ||
} | ||
*renew_all_topics_lock.lock().await = false; | ||
|
||
if let Some(metrics) = metrics { | ||
let ctx = Context::current(); | ||
metrics.subscribed_project_topics.observe( | ||
&ctx, | ||
project_topics_count as u64, | ||
&[], | ||
); | ||
metrics.subscribed_subscriber_topics.observe( | ||
&ctx, | ||
subscriber_topics_count as u64, | ||
&[], | ||
); | ||
metrics.subscribe_latency.record(&ctx, elapsed, &[]); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
if let Some(metrics) = metrics { | ||
let ctx = Context::current(); | ||
metrics | ||
.subscribed_project_topics | ||
.observe(&ctx, project_topics_count as u64, &[]); | ||
metrics | ||
.subscribed_subscriber_topics | ||
.observe(&ctx, subscriber_topics_count as u64, &[]); | ||
metrics.subscribe_latency.record(&ctx, elapsed, &[]); | ||
} else { | ||
info!("Renew operation already running"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need to print this log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cant hurt There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice to have for later debugging if necessary |
||
} | ||
|
||
Ok(()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to decrease this almost to 10 times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable was re-purposed. It was actually already set to 25