Skip to content

Commit baac3b0

Browse files
committed
store: Add wait time for indexing permit to connection wait time
1 parent c7dbc41 commit baac3b0

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

store/postgres/src/pool/mod.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use graph::prelude::{
1717
use graph::prelude::{tokio, MetricsRegistry};
1818
use graph::slog::warn;
1919
use graph::util::timed_rw_lock::TimedMutex;
20+
use tokio::sync::OwnedSemaphorePermit;
2021

2122
use std::fmt::{self};
2223
use std::ops::{Deref, DerefMut};
@@ -48,7 +49,7 @@ pub type AsyncPgConnection = deadpool::managed::Object<ConnectionManager>;
4849
/// backpressure to prevent pool exhaustion during mass operations.
4950
pub struct PermittedConnection {
5051
conn: AsyncPgConnection,
51-
_permit: tokio::sync::OwnedSemaphorePermit,
52+
_permit: OwnedSemaphorePermit,
5253
}
5354

5455
impl Deref for PermittedConnection {
@@ -571,6 +572,10 @@ impl PoolInner {
571572
///
572573
/// If `timeouts` is `None`, the default pool timeouts are used.
573574
///
575+
/// The `prev_wait` duration is the time already spent waiting for a
576+
/// permit to get a connection; that time is added to the total wait
577+
/// time recorded.
578+
///
574579
/// On error, returns `StoreError::DatabaseUnavailable` and marks the
575580
/// pool as unavailable if we can tell that the error is due to the pool
576581
/// being closed. Returns `StoreError::StatementTimeout` if the error is
@@ -579,13 +584,14 @@ impl PoolInner {
579584
&self,
580585
pool: &AsyncPool,
581586
timeouts: Option<Timeouts>,
587+
prev_wait: Duration,
582588
) -> Result<AsyncPgConnection, StoreError> {
583589
let start = Instant::now();
584590
let res = match timeouts {
585591
Some(timeouts) => pool.timeout_get(&timeouts).await,
586592
None => pool.get().await,
587593
};
588-
let elapsed = start.elapsed();
594+
let elapsed = start.elapsed() + prev_wait;
589595
self.wait_meter.add_conn_wait_time(elapsed);
590596
match res {
591597
Ok(conn) => {
@@ -610,7 +616,7 @@ impl PoolInner {
610616
}
611617

612618
async fn get(&self) -> Result<AsyncPgConnection, StoreError> {
613-
self.get_from_pool(&self.pool, None).await
619+
self.get_from_pool(&self.pool, None, Duration::ZERO).await
614620
}
615621

616622
/// Get the pool for fdw connections. It is an error if none is configured
@@ -644,7 +650,7 @@ impl PoolInner {
644650
{
645651
let pool = self.fdw_pool(logger)?;
646652
loop {
647-
match self.get_from_pool(&pool, None).await {
653+
match self.get_from_pool(&pool, None, Duration::ZERO).await {
648654
Ok(conn) => return Ok(conn),
649655
Err(e) => {
650656
if timeout() {
@@ -671,7 +677,10 @@ impl PoolInner {
671677
create: None,
672678
recycle: None,
673679
};
674-
let Ok(conn) = self.get_from_pool(fdw_pool, Some(timeouts)).await else {
680+
let Ok(conn) = self
681+
.get_from_pool(fdw_pool, Some(timeouts), Duration::ZERO)
682+
.await
683+
else {
675684
return None;
676685
};
677686
Some(conn)
@@ -708,7 +717,7 @@ impl PoolInner {
708717
)
709718
}
710719

711-
pub(crate) async fn query_permit(&self) -> tokio::sync::OwnedSemaphorePermit {
720+
pub(crate) async fn query_permit(&self) -> OwnedSemaphorePermit {
712721
let start = Instant::now();
713722
let permit = self.query_semaphore.cheap_clone().acquire_owned().await;
714723
self.semaphore_wait_stats
@@ -721,23 +730,24 @@ impl PoolInner {
721730
/// Acquire a permit for indexing operations. This provides backpressure
722731
/// to prevent connection pool exhaustion during mass subgraph startup
723732
/// or high write load.
724-
async fn indexing_permit(&self) -> tokio::sync::OwnedSemaphorePermit {
733+
async fn indexing_permit(&self) -> (OwnedSemaphorePermit, Duration) {
725734
let start = Instant::now();
726735
let permit = self.indexing_semaphore.cheap_clone().acquire_owned().await;
736+
let elapsed = start.elapsed();
727737
self.indexing_semaphore_wait_stats
728738
.write()
729739
.unwrap()
730-
.add_and_register(start.elapsed(), &self.indexing_semaphore_wait_gauge);
731-
permit.unwrap()
740+
.add_and_register(elapsed, &self.indexing_semaphore_wait_gauge);
741+
(permit.unwrap(), elapsed)
732742
}
733743

734744
/// Get a connection with backpressure via semaphore permit. Use this
735745
/// for indexing operations to prevent pool exhaustion. This method will
736746
/// wait indefinitely until a permit, and with that, a connection is
737747
/// available.
738748
pub(crate) async fn get_permitted(&self) -> Result<PermittedConnection, StoreError> {
739-
let permit = self.indexing_permit().await;
740-
let conn = self.get().await?;
749+
let (permit, permit_wait) = self.indexing_permit().await;
750+
let conn = self.get_from_pool(&self.pool, None, permit_wait).await?;
741751
Ok(PermittedConnection {
742752
conn,
743753
_permit: permit,

0 commit comments

Comments
 (0)