Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use batch subscribe and remove 4050 #388

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub struct Metrics {
relay_subscribe_failures: Counter<u64>,
relay_subscribe_latency: Histogram<u64>,
relay_subscribe_request_latency: Histogram<u64>,
relay_batch_subscribes: Counter<u64>,
relay_batch_subscribe_failures: Counter<u64>,
relay_batch_subscribe_latency: Histogram<u64>,
relay_batch_subscribe_request_latency: Histogram<u64>,
postgres_queries: Counter<u64>,
postgres_query_latency: Histogram<u64>,
keys_server_requests: Counter<u64>,
Expand Down Expand Up @@ -136,6 +140,28 @@ impl Metrics {
.with_description("The latency subscribing to relay topics")
.init();

let relay_batch_subscribes: Counter<u64> = meter
.u64_counter("relay_batch_subscribes")
.with_description(
"The number of batch subscribes to relay topics (not including retries)",
)
.init();

let relay_batch_subscribe_failures: Counter<u64> = meter
.u64_counter("relay_batch_subscribe_failures")
.with_description("The number of failures to batch subscribe to relay topics")
.init();

let relay_batch_subscribe_latency: Histogram<u64> = meter
.u64_histogram("relay_batch_subscribe_latency")
.with_description("The latency batch subscribing to relay topics w/ built-in retry")
.init();

let relay_batch_subscribe_request_latency: Histogram<u64> = meter
.u64_histogram("relay_batch_subscribe_request_latency")
.with_description("The latency batch subscribing to relay topics")
.init();

let postgres_queries: Counter<u64> = meter
.u64_counter("postgres_queries")
.with_description("The number of Postgres queries executed")
Expand Down Expand Up @@ -222,6 +248,10 @@ impl Metrics {
relay_subscribe_failures,
relay_subscribe_latency,
relay_subscribe_request_latency,
relay_batch_subscribes,
relay_batch_subscribe_failures,
relay_batch_subscribe_latency,
relay_batch_subscribe_request_latency,
postgres_queries,
postgres_query_latency,
keys_server_requests,
Expand Down Expand Up @@ -338,6 +368,31 @@ impl Metrics {
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn relay_batch_subscribe(&self, success: bool, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
let attributes = [KeyValue::new("success", success.to_string())];
self.relay_batch_subscribes.add(&ctx, 1, &attributes);
self.relay_batch_subscribe_latency
.record(&ctx, elapsed.as_millis() as u64, &attributes);
}

pub fn relay_batch_subscribe_failure(&self, is_permanent: bool) {
let ctx = Context::current();
let attributes = [KeyValue::new("is_permanent", is_permanent.to_string())];
self.relay_batch_subscribe_failures
.add(&ctx, 1, &attributes);
}

pub fn relay_batch_subscribe_request(&self, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
self.relay_batch_subscribe_request_latency
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn postgres_query(&self, query_name: &'static str, start: Instant) {
let elapsed = start.elapsed();

Expand Down
97 changes: 69 additions & 28 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use {
crate::{
metrics::Metrics,
spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL},
},
crate::metrics::Metrics,
relay_client::{error::Error, http::Client},
relay_rpc::{
domain::Topic,
rpc::{self, msg_id::get_message_id, Publish, PublishError, SubscriptionError},
},
std::{
sync::{Arc, OnceLock},
time::{Duration, Instant},
},
std::time::{Duration, Instant},
tokio::time::sleep,
tracing::{error, info, instrument, warn},
};
Expand All @@ -33,7 +27,7 @@ pub async fn publish_relay_message(
info!("publish_relay_message");
let start = Instant::now();

let client_publish_call = || async {
let call = || async {
let start = Instant::now();
let result = relay_client
.publish(
Expand Down Expand Up @@ -63,7 +57,7 @@ pub async fn publish_relay_message(
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
while let Err(e) = call().await {
tries += 1;
let is_permanent = tries >= 10;
if let Some(metrics) = metrics {
Expand Down Expand Up @@ -104,7 +98,7 @@ pub async fn subscribe_relay_topic(
info!("subscribe_relay_topic");
let start = Instant::now();

let client_publish_call = || async {
let call = || async {
let start = Instant::now();
let result = relay_client.subscribe_blocking(topic.clone()).await;
if let Some(metrics) = metrics {
Expand All @@ -127,7 +121,7 @@ pub async fn subscribe_relay_topic(
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
while let Err(e) = call().await {
tries += 1;
let is_permanent = tries >= 10;
if let Some(metrics) = metrics {
Expand Down Expand Up @@ -162,25 +156,72 @@ pub async fn subscribe_relay_topic(
}

#[instrument(skip(relay_client, metrics))]
pub async fn extend_subscription_ttl(
pub async fn batch_subscribe_relay_topics(
relay_client: &Client,
topic: Topic,
topics: Vec<Topic>,
metrics: Option<&Metrics>,
) -> Result<(), Error<PublishError>> {
info!("extend_subscription_ttl");

// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
let message = LOCK.get_or_init(|| "".into()).clone();

let publish = Publish {
topic,
message,
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
) -> Result<(), Error<SubscriptionError>> {
info!("batch_subscribe_relay_topic");
let start = Instant::now();

let call = || async {
let start = Instant::now();
let result = relay_client.batch_subscribe_blocking(topics.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_batch_subscribe_request(start);
}
// TODO process each error individually
// TODO retry relay internal failures?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// https://github.com/WalletConnect/notify-server/issues/395
match result {
Ok(_) => Ok(()),
Err(e) => match e {
Error::Response(rpc::Error::Handler(
SubscriptionError::SubscriberLimitExceeded,
)) => {
// FIXME figure out how to handle this properly; being unable to subscribe means a broken state
Copy link

@nopestack nopestack Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating an issue to address this and linking it here.

Copy link
Member Author

@chris13524 chris13524 Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue was root caused and no issue should currently exist because Notify Server always subscribes first. Confirmed with logs.

Removing this error handling logic in 6e4cb5c as the error should not happen.

// https://walletconnect.slack.com/archives/C058RS0MH38/p1708183383748259
warn!("Subscriber limit exceeded for topics {topics:?}");
Ok(())
}
e => Err(e),
},
}
};
publish_relay_message(relay_client, &publish, metrics).await

let mut tries = 0;
while let Err(e) = call().await {
tries += 1;
let is_permanent = tries >= 10;
if let Some(metrics) = metrics {
metrics.relay_batch_subscribe_failure(is_permanent);
}

if is_permanent {
error!("Permanent error batch subscribing to topics, took {tries} tries: {e:?}");

if let Some(metrics) = metrics {
// TODO make DRY with end-of-function call
metrics.relay_batch_subscribe(false, start);
}
return Err(e);
}

let retry_in = calculate_retry_in(tries);
warn!(
"Temporary error batch subscribing to topics, retrying attempt {tries} in {retry_in:?}: {e:?}"
);
sleep(retry_in).await;
}

if let Some(metrics) = metrics {
metrics.relay_batch_subscribe(true, start);
}

// Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes
sleep(Duration::from_millis(250)).await;

Ok(())
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use {
},
error::NotifyServerError,
model::helpers::{get_project_by_topic, get_welcome_notification, upsert_subscriber},
publish_relay_message::{
extend_subscription_ttl, publish_relay_message, subscribe_relay_topic,
},
publish_relay_message::{publish_relay_message, subscribe_relay_topic},
rate_limit::{self, Clock, RateLimitError},
registry::storage::redis::Redis,
rpc::{decode_key, derive_key, JsonRpcResponse, NotifySubscribe, ResponseAuth},
Expand Down Expand Up @@ -275,10 +273,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
info!("Finished publishing subscribe response");
}

extend_subscription_ttl(&state.relay_client, notify_topic, state.metrics.as_ref())
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?

// TODO do in same txn as upsert_subscriber()
if subscriber.inserted {
let welcome_notification =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
error::NotifyServerError,
model::helpers::upsert_project,
publish_relay_message::{extend_subscription_ttl, subscribe_relay_topic},
publish_relay_message::subscribe_relay_topic,
rate_limit::{self, Clock, RateLimitError},
registry::{extractor::AuthedProjectId, storage::redis::Redis},
state::AppState,
Expand Down Expand Up @@ -103,9 +103,6 @@ pub async fn handler(
info!("Subscribing to project topic: {topic}");
subscribe_relay_topic(&state.relay_client, &topic, state.metrics.as_ref()).await?;

info!("Extending subscription TTL");
extend_subscription_ttl(&state.relay_client, topic.clone(), state.metrics.as_ref()).await?;

info!("Successfully subscribed to project topic: {topic}");
Ok(Json(SubscribeTopicResponseBody {
authentication_key: project.authentication_public_key,
Expand Down
49 changes: 21 additions & 28 deletions src/services/relay_renewal_job/refresh_topic_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use {
error::NotifyServerError,
metrics::Metrics,
model::helpers::{get_project_topics, get_subscriber_topics},
publish_relay_message::{extend_subscription_ttl, subscribe_relay_topic},
publish_relay_message::batch_subscribe_relay_topics,
},
futures_util::{StreamExt, TryFutureExt, TryStreamExt},
futures_util::StreamExt,
relay_client::http::Client,
relay_rpc::domain::Topic,
sqlx::PgPool,
Expand Down Expand Up @@ -46,6 +46,14 @@ pub async fn run(
let topics_count = topics.len();
info!("topics_count: {topics_count}");

let topic_batches = topics
// Chunk as 1 since we don't yet have the ability to process each topic error individually
// https://github.com/WalletConnect/notify-server/issues/395
// .chunks(MAX_SUBSCRIPTION_BATCH_SIZE)
.chunks(1)
.map(|topics| topics.to_vec())
.collect::<Vec<_>>();

// Limit concurrency to avoid overwhelming the relay with requests.
const REQUEST_CONCURRENCY: usize = 25;

Expand All @@ -64,36 +72,21 @@ pub async fn run(
let client = &client;
let metrics = metrics.as_ref();

// Using `batch_subscription` was removed in https://github.com/WalletConnect/notify-server/pull/359
// We can't really use this right now because we are also extending the topic TTL which could take longer than the 5m TTL
let result = futures_util::stream::iter(topics)
.map(|topic| async move {
// Subscribe a second time as the initial subscription above may have expired
subscribe_relay_topic(client, &topic, metrics)
.map_ok(|_| ())
.map_err(NotifyServerError::from)
.and_then(|_| {
// Subscribing only guarantees 5m TTL, so we always need to extend it.
extend_subscription_ttl(client, topic.clone(), metrics)
.map_ok(|_| ())
.map_err(Into::into)
})
.await
})
// Above we want to resubscribe as quickly as possible so use a high concurrency value
// But here we prefer stability and are OK with a lower value
let result = futures_util::stream::iter(topic_batches)
.map(|topics| batch_subscribe_relay_topics(client, topics, metrics))
.buffer_unordered(REQUEST_CONCURRENCY)
.try_collect::<Vec<_>>()
.collect::<Vec<_>>()
.await;
let elapsed: u64 = start.elapsed().as_millis().try_into().unwrap();
if let Err(e) = result {
// An error here is bad, as topics will not have been renewed.
// However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually.
// With <https://github.com/WalletConnect/notify-server/issues/325> we will be able to guarantee renewal much better.
error!("Failed to renew all topic subscriptions in {elapsed}ms: {e}");
} else {
info!("Success renewing all topic subscriptions in {elapsed}ms");
for result in &result {
if let Err(e) = result {
// An error here is bad, as topics will not have been renewed.
// However, this should be rare and many resubscribes will happen within 30 days so all topics should be renewed eventually.
// With <https://github.com/WalletConnect/notify-server/issues/325> we will be able to guarantee renewal much better.
error!("Failed to renew some topic subscriptions: {e}");
}
}
info!("Completed topic renew job (possibly with errors) in {elapsed}ms");
*renew_all_topics_lock.lock().await = false;

if let Some(metrics) = metrics {
Expand Down
2 changes: 0 additions & 2 deletions src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub const NOTIFY_WATCH_SUBSCRIPTIONS_TAG: u32 = 4010;
pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG: u32 = 4011;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TAG: u32 = 4012;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG: u32 = 4013;
pub const NOTIFY_NOOP_TAG: u32 = 4050;
pub const NOTIFY_GET_NOTIFICATIONS_TAG: u32 = 4014;
pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TAG: u32 = 4015;

Expand Down Expand Up @@ -56,7 +55,6 @@ pub const NOTIFY_WATCH_SUBSCRIPTIONS_TTL: Duration = T300;
pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TTL: Duration = T300;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TTL: Duration = T300;
pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL: Duration = T300;
pub const NOTIFY_NOOP_TTL: Duration = T300;
pub const NOTIFY_GET_NOTIFICATIONS_TTL: Duration = T300;
pub const NOTIFY_GET_NOTIFICATIONS_RESPONSE_TTL: Duration = T300;

Expand Down
Loading
Loading