Skip to content

Commit

Permalink
bugfix: ingest worker CPU usage spiking when redis dies
Browse files Browse the repository at this point in the history
  • Loading branch information
skeptrunedev authored and cdxker committed Apr 9, 2024
1 parent 54b140f commit a94ed0e
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions server/src/bin/ingestion-microservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async fn ingestion_service(
) {
log::info!("Starting ingestion service thread");

let mut sleep_time = std::time::Duration::from_secs(1);
let mut redis_conn_sleep = std::time::Duration::from_secs(1);

#[allow(unused_assignments)]
let mut opt_redis_connection = None;
Expand All @@ -189,13 +189,15 @@ async fn ingestion_service(
break;
}

tokio::time::sleep(sleep_time).await;
sleep_time = std::cmp::min(sleep_time * 2, std::time::Duration::from_secs(60));
tokio::time::sleep(redis_conn_sleep).await;
redis_conn_sleep = std::cmp::min(redis_conn_sleep * 2, std::time::Duration::from_secs(300));
}

let mut redis_connection =
opt_redis_connection.expect("Failed to get redis connection outside of loop");

let mut broken_pipe_sleep = std::time::Duration::from_secs(10);

loop {
if should_terminate.load(Ordering::Relaxed) {
log::info!("Shutting down");
Expand All @@ -216,6 +218,8 @@ async fn ingestion_service(
brpoplpush_transaction.finish();

let serialized_message = if let Ok(payload) = payload_result {
broken_pipe_sleep = std::time::Duration::from_secs(10);

if payload.is_empty() {
continue;
}
Expand All @@ -226,6 +230,12 @@ async fn ingestion_service(
.clone()
} else {
log::error!("Unable to process {:?}", payload_result);

if payload_result.is_err_and(|err| err.is_io_error()) {
tokio::time::sleep(broken_pipe_sleep).await;
broken_pipe_sleep = std::cmp::min(broken_pipe_sleep * 2, std::time::Duration::from_secs(300));
}

continue;
};

Expand Down

0 comments on commit a94ed0e

Please sign in to comment.