From 5f38b209ca0194c1bb402102114edaaaeace43be Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 21 Mar 2024 10:49:00 -0700 Subject: [PATCH] Only poll backend state once per scheduled jobs loop rather than per job (#23747) Previously we'd check the backend state model once per running scheduled job. This allowed us to stop scheduled jobs a bit faster if the backend was stopped during the loop. However it also introduced a per running job check to the loop. If we have 19 jobs running and can schedule one more, we'd parse the backend job state doc 20 times to schedule one new job. Not a big deal with lower concurrency limits. As we're playing with increasing concurrency limits higher to 100 or even 1000, this repetitive parsing becomes non-trivial because of the N^2 nature of the scheduled jobs loop. To fix this I've given up on cancelling scheduled jobs during the loop. It should be a matter of milliseconds anyway, so the behavior shouldn't be noticeably different for users. I've also pulled out the inner loop to make this a little easier to tweak. GitOrigin-RevId: 420c36d65ed5f00752a742260de16ac1ac8ebb18 --- crates/application/src/scheduled_jobs/mod.rs | 135 ++++++++++--------- 1 file changed, 75 insertions(+), 60 deletions(-) diff --git a/crates/application/src/scheduled_jobs/mod.rs b/crates/application/src/scheduled_jobs/mod.rs index 5ba06b80..3865b243 100644 --- a/crates/application/src/scheduled_jobs/mod.rs +++ b/crates/application/src/scheduled_jobs/mod.rs @@ -221,72 +221,27 @@ impl ScheduledJobExecutor { async fn run(&self, backoff: &mut Backoff) -> anyhow::Result<()> { tracing::info!("Starting scheduled job executor"); - let (job_result_tx, mut job_result_rx) = + let (job_finished_tx, mut job_finished_rx) = mpsc::channel(*SCHEDULED_JOB_EXECUTION_PARALLELISM); let mut running_job_ids = HashSet::new(); loop { - Self::drain_finished_jobs(&mut running_job_ids, &mut job_result_rx).await; + Self::drain_finished_jobs(&mut running_job_ids, &mut job_finished_rx).await; let mut tx = self.database.begin(Identity::Unknown).await?; - // _backend_state appears unused but is needed to make sure the backend_state - // is part of the readset for the query we subscribe to. - let _backend_state = BackendStateModel::new(&mut tx).get_backend_state().await?; - let now = self.rt.generate_timestamp()?; - let index_query = Query::index_range(IndexRange { - index_name: SCHEDULED_JOBS_INDEX.clone(), - range: vec![IndexRangeExpression::Gt( - NEXT_TS_FIELD.clone(), - value::ConvexValue::Null, - )], - order: Order::Asc, - }); - let mut query_stream = ResolvedQuery::new(&mut tx, index_query)?; + let backend_state = BackendStateModel::new(&mut tx).get_backend_state().await?; + let can_run = match backend_state { + BackendState::Running => true, + BackendState::Paused | BackendState::Disabled => false, + }; - let mut next_job_wait = None; - while let Some(doc) = query_stream.next(&mut tx, None).await? { - // Get the backend state again in case of a race where jobs are scheduled and - // after the first tx begins the backend is paused. - let mut new_tx = self.database.begin(Identity::Unknown).await?; - let backend_state = BackendStateModel::new(&mut new_tx) - .get_backend_state() - .await?; - drop(new_tx); - match backend_state { - BackendState::Running => {}, - BackendState::Paused | BackendState::Disabled => break, - } - let job: ParsedDocument = doc.try_into()?; - let (job_id, job) = job.clone().into_id_and_value(); - if running_job_ids.contains(&job_id) { - continue; - } - let next_ts = job.next_ts.ok_or_else(|| { - anyhow::anyhow!("Could not get next_ts to run scheduled job at") - })?; - if next_ts > now { - next_job_wait = Some(next_ts - now); - break; - } - metrics::log_scheduled_job_execution_lag(now - next_ts); - if running_job_ids.len() == *SCHEDULED_JOB_EXECUTION_PARALLELISM { - // We are due to execute the next job, but we can't because of - // parallelism limits. We should break after logging the lag - // here, and then wake up in few seconds to log the lag again - // unless something else changes in between. - next_job_wait = Some(Duration::from_secs(5)); - break; - } - let context = self.context.clone(); - let tx = job_result_tx.clone(); - self.rt.spawn("spawn_scheduled_job", async move { - let result = context.execute_job(job, job_id).await; - let _ = tx.send(result).await; - }); - - running_job_ids.insert(job_id); - } + let poll_next_time = if can_run { + self.query_and_start_jobs(&mut tx, &mut running_job_ids, &job_finished_tx) + .await? + } else { + None + }; - let next_job_future = if let Some(next_job_wait) = next_job_wait { + let next_job_future = if let Some(next_job_wait) = poll_next_time { Either::Left(self.rt.wait(next_job_wait)) } else { Either::Right(std::future::pending()) @@ -296,7 +251,7 @@ impl ScheduledJobExecutor { let subscription = self.database.subscribe(token).await?; select_biased! { - job_id = job_result_rx.recv().fuse() => { + job_id = job_finished_rx.recv().fuse() => { if let Some(job_id) = job_id { running_job_ids.remove(&job_id); } else { @@ -311,6 +266,66 @@ impl ScheduledJobExecutor { backoff.reset(); } } + + /// Read through scheduled jobs and start any that are allowed by our + /// concurrency limit and the job's scheduled time. + /// + /// We return an optional amount of time we should sleep before polling + /// scheduled jobs again. For example, if a job isn't scheduled until + /// time T, we don't need to poll the jobs table again until T. Callers + /// are expected to handle other cases why we might want to poll (like a + /// new job being added). + async fn query_and_start_jobs( + &self, + tx: &mut Transaction, + running_job_ids: &mut HashSet, + job_finished_tx: &mpsc::Sender, + ) -> anyhow::Result> { + let now = self.rt.generate_timestamp()?; + let index_query = Query::index_range(IndexRange { + index_name: SCHEDULED_JOBS_INDEX.clone(), + range: vec![IndexRangeExpression::Gt( + NEXT_TS_FIELD.clone(), + value::ConvexValue::Null, + )], + order: Order::Asc, + }); + let mut query_stream = ResolvedQuery::new(tx, index_query)?; + let mut next_job_wait = None; + while let Some(doc) = query_stream.next(tx, None).await? { + let job: ParsedDocument = doc.try_into()?; + let (job_id, job) = job.clone().into_id_and_value(); + if running_job_ids.contains(&job_id) { + continue; + } + let next_ts = job + .next_ts + .ok_or_else(|| anyhow::anyhow!("Could not get next_ts to run scheduled job at"))?; + if next_ts > now { + next_job_wait = Some(next_ts - now); + break; + } + + metrics::log_scheduled_job_execution_lag(now - next_ts); + if running_job_ids.len() == *SCHEDULED_JOB_EXECUTION_PARALLELISM { + // We are due to execute the next job, but we can't because of + // parallelism limits. We should break after logging the lag + // here, and then wake up in few seconds to log the lag again + // unless something else changes in between. + next_job_wait = Some(Duration::from_secs(5)); + break; + } + let context = self.context.clone(); + let tx = job_finished_tx.clone(); + self.rt.spawn("spawn_scheduled_job", async move { + let result = context.execute_job(job, job_id).await; + let _ = tx.send(result).await; + }); + + running_job_ids.insert(job_id); + } + Ok(next_job_wait) + } } impl ScheduledJobContext {