Skip to content

Commit

Permalink
Only poll backend state once per scheduled jobs loop rather than per …
Browse files Browse the repository at this point in the history
…job (#23747)

Previously we'd check the backend state model once per running scheduled job. This allowed us to stop scheduled jobs a bit faster if the backend was stopped during the loop.

However it also introduced a per running job check to the loop. If we have 19 jobs running and can schedule one more, we'd parse the backend job state doc 20 times to schedule one new job. Not a big deal with lower concurrency limits. As we're playing with increasing concurrency limits higher to 100 or even 1000, this repetitive parsing becomes non-trivial because of the N^2 nature of the scheduled jobs loop.

To fix this I've given up on cancelling scheduled jobs during the loop. It should be a matter of milliseconds anyway, so the behavior shouldn't be noticeably different for users.

I've also pulled out the inner loop to make this a little easier to tweak.

GitOrigin-RevId: 420c36d65ed5f00752a742260de16ac1ac8ebb18
  • Loading branch information
sjudd authored and Convex, Inc. committed Mar 21, 2024
1 parent 9e0fb73 commit 5f38b20
Showing 1 changed file with 75 additions and 60 deletions.
135 changes: 75 additions & 60 deletions crates/application/src/scheduled_jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,72 +221,27 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {

async fn run(&self, backoff: &mut Backoff) -> anyhow::Result<()> {
tracing::info!("Starting scheduled job executor");
let (job_result_tx, mut job_result_rx) =
let (job_finished_tx, mut job_finished_rx) =
mpsc::channel(*SCHEDULED_JOB_EXECUTION_PARALLELISM);
let mut running_job_ids = HashSet::new();
loop {
Self::drain_finished_jobs(&mut running_job_ids, &mut job_result_rx).await;
Self::drain_finished_jobs(&mut running_job_ids, &mut job_finished_rx).await;

let mut tx = self.database.begin(Identity::Unknown).await?;
// _backend_state appears unused but is needed to make sure the backend_state
// is part of the readset for the query we subscribe to.
let _backend_state = BackendStateModel::new(&mut tx).get_backend_state().await?;
let now = self.rt.generate_timestamp()?;
let index_query = Query::index_range(IndexRange {
index_name: SCHEDULED_JOBS_INDEX.clone(),
range: vec![IndexRangeExpression::Gt(
NEXT_TS_FIELD.clone(),
value::ConvexValue::Null,
)],
order: Order::Asc,
});
let mut query_stream = ResolvedQuery::new(&mut tx, index_query)?;
let backend_state = BackendStateModel::new(&mut tx).get_backend_state().await?;
let can_run = match backend_state {
BackendState::Running => true,
BackendState::Paused | BackendState::Disabled => false,
};

let mut next_job_wait = None;
while let Some(doc) = query_stream.next(&mut tx, None).await? {
// Get the backend state again in case of a race where jobs are scheduled and
// after the first tx begins the backend is paused.
let mut new_tx = self.database.begin(Identity::Unknown).await?;
let backend_state = BackendStateModel::new(&mut new_tx)
.get_backend_state()
.await?;
drop(new_tx);
match backend_state {
BackendState::Running => {},
BackendState::Paused | BackendState::Disabled => break,
}
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
let (job_id, job) = job.clone().into_id_and_value();
if running_job_ids.contains(&job_id) {
continue;
}
let next_ts = job.next_ts.ok_or_else(|| {
anyhow::anyhow!("Could not get next_ts to run scheduled job at")
})?;
if next_ts > now {
next_job_wait = Some(next_ts - now);
break;
}
metrics::log_scheduled_job_execution_lag(now - next_ts);
if running_job_ids.len() == *SCHEDULED_JOB_EXECUTION_PARALLELISM {
// We are due to execute the next job, but we can't because of
// parallelism limits. We should break after logging the lag
// here, and then wake up in few seconds to log the lag again
// unless something else changes in between.
next_job_wait = Some(Duration::from_secs(5));
break;
}
let context = self.context.clone();
let tx = job_result_tx.clone();
self.rt.spawn("spawn_scheduled_job", async move {
let result = context.execute_job(job, job_id).await;
let _ = tx.send(result).await;
});

running_job_ids.insert(job_id);
}
let poll_next_time = if can_run {
self.query_and_start_jobs(&mut tx, &mut running_job_ids, &job_finished_tx)
.await?
} else {
None
};

let next_job_future = if let Some(next_job_wait) = next_job_wait {
let next_job_future = if let Some(next_job_wait) = poll_next_time {
Either::Left(self.rt.wait(next_job_wait))
} else {
Either::Right(std::future::pending())
Expand All @@ -296,7 +251,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
let subscription = self.database.subscribe(token).await?;

select_biased! {
job_id = job_result_rx.recv().fuse() => {
job_id = job_finished_rx.recv().fuse() => {
if let Some(job_id) = job_id {
running_job_ids.remove(&job_id);
} else {
Expand All @@ -311,6 +266,66 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
backoff.reset();
}
}

/// Read through scheduled jobs and start any that are allowed by our
/// concurrency limit and the job's scheduled time.
///
/// We return an optional amount of time we should sleep before polling
/// scheduled jobs again. For example, if a job isn't scheduled until
/// time T, we don't need to poll the jobs table again until T. Callers
/// are expected to handle other cases why we might want to poll (like a
/// new job being added).
async fn query_and_start_jobs(
&self,
tx: &mut Transaction<RT>,
running_job_ids: &mut HashSet<ResolvedDocumentId>,
job_finished_tx: &mpsc::Sender<ResolvedDocumentId>,
) -> anyhow::Result<Option<Duration>> {
let now = self.rt.generate_timestamp()?;
let index_query = Query::index_range(IndexRange {
index_name: SCHEDULED_JOBS_INDEX.clone(),
range: vec![IndexRangeExpression::Gt(
NEXT_TS_FIELD.clone(),
value::ConvexValue::Null,
)],
order: Order::Asc,
});
let mut query_stream = ResolvedQuery::new(tx, index_query)?;
let mut next_job_wait = None;
while let Some(doc) = query_stream.next(tx, None).await? {
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
let (job_id, job) = job.clone().into_id_and_value();
if running_job_ids.contains(&job_id) {
continue;
}
let next_ts = job
.next_ts
.ok_or_else(|| anyhow::anyhow!("Could not get next_ts to run scheduled job at"))?;
if next_ts > now {
next_job_wait = Some(next_ts - now);
break;
}

metrics::log_scheduled_job_execution_lag(now - next_ts);
if running_job_ids.len() == *SCHEDULED_JOB_EXECUTION_PARALLELISM {
// We are due to execute the next job, but we can't because of
// parallelism limits. We should break after logging the lag
// here, and then wake up in few seconds to log the lag again
// unless something else changes in between.
next_job_wait = Some(Duration::from_secs(5));
break;
}
let context = self.context.clone();
let tx = job_finished_tx.clone();
self.rt.spawn("spawn_scheduled_job", async move {
let result = context.execute_job(job, job_id).await;
let _ = tx.send(result).await;
});

running_job_ids.insert(job_id);
}
Ok(next_job_wait)
}
}

impl<RT: Runtime> ScheduledJobContext<RT> {
Expand Down

0 comments on commit 5f38b20

Please sign in to comment.