Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't block startup on subscribe #359

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 24 additions & 39 deletions src/services/relay_renewal_job/refresh_topic_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
},
futures_util::{StreamExt, TryFutureExt, TryStreamExt},
relay_client::http::Client,
relay_rpc::{domain::Topic, rpc::MAX_SUBSCRIPTION_BATCH_SIZE},
relay_rpc::domain::Topic,
sqlx::PgPool,
std::{sync::Arc, time::Instant},
tokio::sync::Mutex,
Expand Down Expand Up @@ -46,36 +46,13 @@ pub async fn run(
let topics_count = topics.len();
info!("topics_count: {topics_count}");

// Collect each batch into its own vec, since `batch_subscribe` would convert
// them anyway.
let topic_batches = topics
.chunks(MAX_SUBSCRIPTION_BATCH_SIZE)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();

// Limit concurrency to avoid overwhelming the relay with requests.
const REQUEST_CONCURRENCY: usize = 200;

futures_util::stream::iter(topic_batches)
.map(|topic_batch| {
// Map result to an unsized type to avoid allocation when collecting,
// as we don't care about subscription IDs.
client.batch_subscribe_blocking(topic_batch).map_ok(|_| ())
})
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.await?;

let elapsed: u64 = start
.elapsed()
.as_millis()
.try_into()
.expect("No error getting ms of elapsed time");
info!("resubscribe took {elapsed}ms");
const REQUEST_CONCURRENCY: usize = 25;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to decrease this almost to 10 times?

Copy link
Member Author

@chris13524 chris13524 Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable was re-purposed. It was actually already set to 25


// If operation already running, don't start another one
let mut operation_running = renew_all_topics_lock.lock().await;
if !*operation_running {
info!("Starting renew operation");
*operation_running = true;
// Renew all subscription TTLs.
// This can take a long time (e.g. 2 hours), so cannot block server startup.
Expand All @@ -86,7 +63,9 @@ pub async fn run(
async move {
let client = &client;
let metrics = metrics.as_ref();
let start = Instant::now();

// Using `batch_subscription` was removed in https://github.com/WalletConnect/notify-server/pull/359
// We can't really use this right now because we are also extending the topic TTL which could take longer than the 5m TTL
let result = futures_util::stream::iter(topics)
.map(|topic| async move {
// Subscribe a second time as the initial subscription above may have expired
Expand All @@ -101,7 +80,7 @@ pub async fn run(
})
// Above we want to resubscribe as quickly as possible so use a high concurrency value
// But here we prefer stability and are OK with a lower value
.buffer_unordered(25)
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.await;
let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap();
Expand All @@ -114,19 +93,25 @@ pub async fn run(
info!("Success renewing all topic subscriptions in {elapsed}ms");
}
*renew_all_topics_lock.lock().await = false;

if let Some(metrics) = metrics {
let ctx = Context::current();
metrics.subscribed_project_topics.observe(
&ctx,
project_topics_count as u64,
&[],
);
metrics.subscribed_subscriber_topics.observe(
&ctx,
subscriber_topics_count as u64,
&[],
);
metrics.subscribe_latency.record(&ctx, elapsed, &[]);
}
}
});
}

if let Some(metrics) = metrics {
let ctx = Context::current();
metrics
.subscribed_project_topics
.observe(&ctx, project_topics_count as u64, &[]);
metrics
.subscribed_subscriber_topics
.observe(&ctx, subscriber_topics_count as u64, &[]);
metrics.subscribe_latency.record(&ctx, elapsed, &[]);
} else {
info!("Renew operation already running");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to print this log?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant hurt

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have for later debugging if necessary

}

Ok(())
Expand Down
Loading