diff --git a/core/graphman/src/commands/deployment/pause.rs b/core/graphman/src/commands/deployment/pause.rs index 9b2e78102ed..0257768a03b 100644 --- a/core/graphman/src/commands/deployment/pause.rs +++ b/core/graphman/src/commands/deployment/pause.rs @@ -37,7 +37,10 @@ pub async fn load_active_deployment( primary_pool: ConnectionPool, deployment: &DeploymentSelector, ) -> Result { - let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?; + let mut primary_conn = primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let locator = crate::deployment::load_deployment_locator( &mut primary_conn, @@ -76,7 +79,7 @@ pub async fn pause_active_deployment( notification_sender: Arc, active_deployment: ActiveDeployment, ) -> Result<(), GraphmanError> { - let primary_conn = primary_pool.get().await?; + let primary_conn = primary_pool.get_permitted().await?; let mut catalog_conn = catalog::Connection::new(primary_conn); let changes = catalog_conn.pause_subgraph(&active_deployment.site).await?; diff --git a/core/graphman/src/commands/deployment/reassign.rs b/core/graphman/src/commands/deployment/reassign.rs index d3979dadba7..5d8e282f306 100644 --- a/core/graphman/src/commands/deployment/reassign.rs +++ b/core/graphman/src/commands/deployment/reassign.rs @@ -29,7 +29,10 @@ impl Deployment { &self, primary_pool: ConnectionPool, ) -> Result, GraphmanError> { - let primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?; + let primary_conn = primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let mut catalog_conn = catalog::Connection::new(primary_conn); let node = catalog_conn .assigned_node(&self.site) @@ -58,7 +61,10 @@ pub async fn load_deployment( primary_pool: ConnectionPool, deployment: &DeploymentSelector, ) -> Result { - let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?; + let mut primary_conn = primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let locator = crate::deployment::load_deployment_locator( &mut primary_conn, @@ -87,7 +93,10 @@ pub async fn reassign_deployment( node: &NodeId, curr_node: Option, ) -> Result { - let primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?; + let primary_conn = primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let mut catalog_conn = catalog::Connection::new(primary_conn); let changes: Vec = match &curr_node { Some(curr) => { diff --git a/core/graphman/src/commands/deployment/resume.rs b/core/graphman/src/commands/deployment/resume.rs index 0e91a997c01..a8f9f4769e7 100644 --- a/core/graphman/src/commands/deployment/resume.rs +++ b/core/graphman/src/commands/deployment/resume.rs @@ -37,7 +37,10 @@ pub async fn load_paused_deployment( primary_pool: ConnectionPool, deployment: &DeploymentSelector, ) -> Result { - let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?; + let mut primary_conn = primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let locator = crate::deployment::load_deployment_locator( &mut primary_conn, @@ -76,7 +79,7 @@ pub async fn resume_paused_deployment( notification_sender: Arc, paused_deployment: PausedDeployment, ) -> Result<(), GraphmanError> { - let primary_conn = primary_pool.get().await?; + let primary_conn = primary_pool.get_permitted().await?; let mut catalog_conn = catalog::Connection::new(primary_conn); let changes = catalog_conn diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs index ee738de40a6..474358c297e 100644 --- a/core/graphman/src/commands/deployment/unassign.rs +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -37,7 +37,10 @@ pub async fn load_assigned_deployment( primary_pool: ConnectionPool, deployment: &DeploymentSelector, ) -> Result { - let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?; + let mut primary_conn = primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let locator = crate::deployment::load_deployment_locator( &mut primary_conn, @@ -73,7 +76,7 @@ pub async fn unassign_deployment( notification_sender: Arc, deployment: AssignedDeployment, ) -> Result<(), GraphmanError> { - let primary_conn = primary_pool.get().await?; + let primary_conn = primary_pool.get_permitted().await?; let mut catalog_conn = catalog::Connection::new(primary_conn); let changes = catalog_conn.unassign_subgraph(&deployment.site).await?; diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index e25b3c83676..4ec6e17c83f 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -208,11 +208,11 @@ fn stream_blocks>( // Back off exponentially whenever we encounter a connection error or a stream with bad data let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45)); - // This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments - #[allow(unused_assignments)] - let mut skip_backoff = false; - try_stream! { + // This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments + #[allow(unused_assignments)] + let mut skip_backoff = false; + loop { let endpoint = client.firehose_endpoint().await?; let logger = logger.new(o!("deployment" => deployment.clone(), "provider" => endpoint.provider.to_string())); diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 593faca3d32..0b86f416c3f 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -39,7 +39,7 @@ use futures03::future::BoxFuture; use graph_derive::CheapClone; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use slog::{error, Logger}; +use slog::Logger; use std::{ any::Any, collections::{HashMap, HashSet}, diff --git a/graph/src/blockchain/substreams_block_stream.rs b/graph/src/blockchain/substreams_block_stream.rs index 9ab5f35db4e..c359ec1a504 100644 --- a/graph/src/blockchain/substreams_block_stream.rs +++ b/graph/src/blockchain/substreams_block_stream.rs @@ -180,13 +180,13 @@ fn stream_blocks>( // Back off exponentially whenever we encounter a connection error or a stream with bad data let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45)); - // This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments - #[allow(unused_assignments)] - let mut skip_backoff = false; - let mut log_data = SubstreamsLogData::new(); try_stream! { + // This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments + #[allow(unused_assignments)] + let mut skip_backoff = false; + if !modules.modules.iter().any(|m| module_name.eq(&m.name)) { Err(BlockStreamError::Fatal(format!( "module `{}` not found", diff --git a/graph/src/components/network_provider/provider_manager.rs b/graph/src/components/network_provider/provider_manager.rs index 93136c516bc..510b053b6ff 100644 --- a/graph/src/components/network_provider/provider_manager.rs +++ b/graph/src/components/network_provider/provider_manager.rs @@ -5,7 +5,6 @@ use std::time::Duration; use derivative::Derivative; use itertools::Itertools; -use slog::error; use slog::info; use slog::warn; use slog::Logger; diff --git a/node/src/config.rs b/node/src/config.rs index 83ea7bf1cc3..db2a5c203e9 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -252,7 +252,7 @@ impl Shard { fn validate(&mut self, name: &str) -> Result<()> { ShardName::new(name.to_string()).map_err(|e| anyhow!(e))?; - self.connection = shellexpand::env(&self.connection)?.into_owned(); + self.expand_connection()?; if matches!(self.pool_size, PoolSize::None) { return Err(anyhow!("missing pool size definition for shard `{}`", name)); @@ -301,6 +301,25 @@ impl Shard { replicas, }) } + + fn expand_connection(&mut self) -> Result<()> { + let mut url = Url::parse(shellexpand::env(&self.connection)?.as_ref())?; + // Put the PGAPPNAME into the URL since tokio-postgres ignores this + // environment variable + if let Some(app_name) = std::env::var("PGAPPNAME").ok() { + let query = match url.query() { + Some(query) => { + format!("{query}&application_name={app_name}") + } + None => { + format!("application_name={app_name}") + } + }; + url.set_query(Some(&query)); + } + self.connection = url.to_string(); + Ok(()) + } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -1944,6 +1963,10 @@ mod tests { let query = NodeId::new("query_node_1").unwrap(); let other = NodeId::new("other_node_1").unwrap(); + let appname = std::env::var("PGAPPNAME").ok(); + unsafe { + std::env::set_var("PGAPPNAME", "config-test"); + } let shard = { let mut shard = toml::from_str::( r#" @@ -1961,10 +1984,15 @@ fdw_pool_size = [ shard.validate("index_node_1").unwrap(); shard }; + if let Some(appname) = appname { + unsafe { + std::env::set_var("PGAPPNAME", appname); + } + } assert_eq!( shard.connection, - "postgresql://postgres:postgres@postgres/graph" + "postgresql://postgres:postgres@postgres/graph?application_name=config-test" ); assert_eq!(shard.pool_size.size_for(&index, "ashard").unwrap(), 20); diff --git a/node/src/manager/commands/assign.rs b/node/src/manager/commands/assign.rs index 234b967584f..971d8a4687f 100644 --- a/node/src/manager/commands/assign.rs +++ b/node/src/manager/commands/assign.rs @@ -13,7 +13,7 @@ pub async fn unassign( ) -> Result<(), Error> { let locator = search.locate_unique(&primary).await?; - let pconn = primary.get().await?; + let pconn = primary.get_permitted().await?; let mut conn = catalog::Connection::new(pconn); let site = conn @@ -38,7 +38,7 @@ pub async fn reassign( let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?; let locator = search.locate_unique(&primary).await?; - let pconn = primary.get().await?; + let pconn = primary.get_permitted().await?; let mut conn = catalog::Connection::new(pconn); let site = conn @@ -81,7 +81,7 @@ pub async fn pause_or_resume( locator: &DeploymentLocator, should_pause: bool, ) -> Result<(), Error> { - let pconn = primary.get().await?; + let pconn = primary.get_permitted().await?; let mut conn = catalog::Connection::new(pconn); let site = conn diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 84d54e145da..12b69d0bf4e 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -157,8 +157,9 @@ pub async fn info( pub async fn remove(primary: ConnectionPool, store: BlockStore, name: String) -> Result<(), Error> { let sites = { - let mut conn = - graph_store_postgres::command_support::catalog::Connection::new(primary.get().await?); + let mut conn = graph_store_postgres::command_support::catalog::Connection::new( + primary.get_permitted().await?, + ); conn.find_sites_for_network(&name).await? }; diff --git a/node/src/manager/commands/rewind.rs b/node/src/manager/commands/rewind.rs index 236dcd3fdd9..b407089d32a 100644 --- a/node/src/manager/commands/rewind.rs +++ b/node/src/manager/commands/rewind.rs @@ -78,7 +78,7 @@ pub async fn run( if !start_block && (block_hash.is_none() || block_number.is_none()) { bail!("--block-hash and --block-number must be specified when --start-block is not set"); } - let pconn = primary.get().await?; + let pconn = primary.get_permitted().await?; let mut conn = store_catalog::Connection::new(pconn); let subgraph_store = store.subgraph_store(); diff --git a/node/src/manager/commands/stats.rs b/node/src/manager/commands/stats.rs index 651a30b45b6..34405b05e9b 100644 --- a/node/src/manager/commands/stats.rs +++ b/node/src/manager/commands/stats.rs @@ -23,7 +23,7 @@ async fn site_and_conn( let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap(); let locator = search.locate_unique(primary_pool).await?; - let pconn = primary_pool.get().await?; + let pconn = primary_pool.get_permitted().await?; let mut conn = store_catalog::Connection::new(pconn); let site = conn diff --git a/server/graphman/src/resolvers/deployment_mutation/create.rs b/server/graphman/src/resolvers/deployment_mutation/create.rs index 3f8cc5d8e20..52e30bc6504 100644 --- a/server/graphman/src/resolvers/deployment_mutation/create.rs +++ b/server/graphman/src/resolvers/deployment_mutation/create.rs @@ -7,7 +7,11 @@ use crate::resolvers::context::GraphmanContext; use graphman::GraphmanError; pub async fn run(ctx: &GraphmanContext, name: &String) -> Result<()> { - let primary_pool = ctx.primary_pool.get().await.map_err(GraphmanError::from)?; + let primary_pool = ctx + .primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let mut catalog_conn = catalog::Connection::new(primary_pool); let name = match SubgraphName::new(name) { diff --git a/server/graphman/src/resolvers/deployment_mutation/remove.rs b/server/graphman/src/resolvers/deployment_mutation/remove.rs index e889896b3aa..c7997b6885f 100644 --- a/server/graphman/src/resolvers/deployment_mutation/remove.rs +++ b/server/graphman/src/resolvers/deployment_mutation/remove.rs @@ -7,7 +7,11 @@ use crate::resolvers::context::GraphmanContext; use graphman::GraphmanError; pub async fn run(ctx: &GraphmanContext, name: &String) -> Result<()> { - let primary_pool = ctx.primary_pool.get().await.map_err(GraphmanError::from)?; + let primary_pool = ctx + .primary_pool + .get_permitted() + .await + .map_err(GraphmanError::from)?; let mut catalog_conn = catalog::Connection::new(primary_pool); let name = match SubgraphName::new(name) { diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index e677623fbcc..c07e9bdc732 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -164,7 +164,7 @@ pub mod primary { } pub(super) async fn drop_chain(pool: &ConnectionPool, name: &str) -> Result<(), StoreError> { - let mut conn = pool.get().await?; + let mut conn = pool.get_permitted().await?; delete(chains::table.filter(chains::name.eq(name))) .execute(&mut conn) @@ -427,7 +427,7 @@ impl BlockStore { let cached = match self.chain_head_cache.get(shard.as_str()) { Some(cached) => cached, None => { - let mut conn = match pool.get().await { + let mut conn = match pool.get_permitted().await { Ok(conn) => conn, Err(StoreError::DatabaseUnavailable) => continue, Err(e) => return Err(e), @@ -568,7 +568,7 @@ impl BlockStore { use crate::primary::db_version as dbv; let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap(); - let mut conn = primary_pool.get().await?; + let mut conn = primary_pool.get_permitted().await?; let version: i64 = dbv::table .select(dbv::version) .get_result(&mut conn) @@ -667,7 +667,7 @@ impl ChainIdStore for BlockStore { // Update the master copy in the primary let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap(); - let mut conn = primary_pool.get().await?; + let mut conn = primary_pool.get_permitted().await?; diesel::update(c::table.filter(c::name.eq(chain_name.as_str()))) .set(( diff --git a/store/postgres/src/chain_head_listener.rs b/store/postgres/src/chain_head_listener.rs index 16c658e52df..b9faa164f0b 100644 --- a/store/postgres/src/chain_head_listener.rs +++ b/store/postgres/src/chain_head_listener.rs @@ -261,7 +261,7 @@ impl ChainHeadUpdateSender { "head_block_number": number }); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.sender .notify( &mut conn, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index bc62b426f98..eadde677f96 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1972,6 +1972,7 @@ impl ChainStoreMetrics { enum BlocksLookupResult { ByHash(Arc, StoreError>>), ByNumber(Arc>, StoreError>>), + Ancestor(Arc, StoreError>>), } pub struct ChainStore { @@ -2024,7 +2025,7 @@ impl ChainStore { pub(crate) async fn create(&self, ident: &ChainIdentifier) -> Result<(), Error> { use public::ethereum_networks::dsl::*; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { async move { insert_into(ethereum_networks) @@ -2051,7 +2052,7 @@ impl ChainStore { pub async fn update_name(&self, name: &str) -> Result<(), Error> { use public::ethereum_networks as n; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { async { update(n::table.filter(n::name.eq(&self.chain))) @@ -2069,7 +2070,7 @@ impl ChainStore { use diesel::dsl::delete; use public::ethereum_networks as n; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { async { self.storage.drop_storage(conn, &self.chain).await?; @@ -2111,7 +2112,7 @@ impl ChainStore { let number: Option = n::table .filter(n::name.eq(chain)) .select(n::head_block_number) - .first::>(&mut self.pool.get().await?) + .first::>(&mut self.pool.get_permitted().await?) .await .optional()? .flatten(); @@ -2131,7 +2132,7 @@ impl ChainStore { pub(crate) async fn set_chain_identifier(&self, ident: &ChainIdentifier) -> Result<(), Error> { use public::ethereum_networks as n; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; diesel::update(n::table.filter(n::name.eq(&self.chain))) .set(( @@ -2195,14 +2196,14 @@ impl ChainStore { } pub async fn delete_blocks(&self, block_hashes: &[&H256]) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage .delete_blocks_by_hash(&mut conn, &self.chain, block_hashes) .await } pub async fn cleanup_shallow_blocks(&self, lowest_block: i32) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage .cleanup_shallow_blocks(&mut conn, lowest_block) .await?; @@ -2211,12 +2212,12 @@ impl ChainStore { // remove_cursor delete the chain_store cursor and return true if it was present pub async fn remove_cursor(&self, chain: &str) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage.remove_cursor(&mut conn, chain).await } pub async fn truncate_block_cache(&self) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage.truncate_block_cache(&mut conn).await?; Ok(()) } @@ -2225,7 +2226,7 @@ impl ChainStore { self: &Arc, hashes: Vec, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let values = self.storage.blocks(&mut conn, &self.chain, &hashes).await?; Ok(values) } @@ -2234,7 +2235,7 @@ impl ChainStore { self: &Arc, numbers: Vec, ) -> Result>, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let values = self .storage .block_ptrs_by_numbers(&mut conn, &self.chain, &numbers) @@ -2261,7 +2262,7 @@ impl ChainStore { let genesis_block_ptr = self.genesis_block_ptr().await?.hash_as_h256(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let candidate = self .storage .chain_head_candidate(&mut conn, &self.chain) @@ -2309,8 +2310,8 @@ impl ChainStore { /// Helper for tests that need to directly modify the tables for the /// chain store #[cfg(debug_assertions)] - pub async fn get_conn_for_test(&self) -> Result { - let conn = self.pool.get().await?; + pub async fn get_conn_for_test(&self) -> Result { + let conn = self.pool.get_permitted().await?; Ok(conn) } } @@ -2336,7 +2337,7 @@ impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { use public::ethereum_networks::dsl::*; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; Ok(ethereum_networks .select((head_block_hash, head_block_number)) .filter(name.eq(&self.chain)) @@ -2369,7 +2370,7 @@ impl ChainHeadStore for ChainStore { ethereum_networks .select(head_block_cursor) .filter(name.eq(&self.chain)) - .load::>(&mut self.pool.get().await?) + .load::>(&mut self.pool.get_permitted().await?) .await .map(|rows| { rows.as_slice() @@ -2394,7 +2395,7 @@ impl ChainHeadStore for ChainStore { //this will send an update via postgres, channel: chain_head_updates self.chain_head_update_sender.send(&hash, number).await?; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { async { self.storage @@ -2437,7 +2438,7 @@ impl ChainStoreTrait for ChainStore { self.recent_blocks_cache.insert_block(block); } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { self.storage .upsert_block(conn, &self.chain, block.as_ref(), true) @@ -2448,7 +2449,7 @@ impl ChainStoreTrait for ChainStore { } async fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; for block in blocks { self.storage .upsert_block(&mut conn, &self.chain, *block, false) @@ -2587,9 +2588,9 @@ impl ChainStoreTrait for ChainStore { // so ByNumber variant is structurally impossible here. let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { (BlocksLookupResult::ByHash(res), _) => res, - (BlocksLookupResult::ByNumber(_), _) => { + (BlocksLookupResult::ByNumber(_) | BlocksLookupResult::Ancestor(_), _) => { Arc::new(Err(StoreError::Unknown(anyhow::anyhow!( - "Unexpected BlocksLookupResult::ByNumber returned from cached block lookup by hash" + "Unexpected BlocksLookupResult variant returned from cached block lookup by hash" )))) } }; @@ -2635,19 +2636,72 @@ impl ChainStoreTrait for ChainStore { block_ptr.hash_hex() ); - // Check the local cache first. - let block_cache = self - .recent_blocks_cache - .get_ancestor(&block_ptr, offset) - .and_then(|x| Some(x.0).zip(x.1)); - if let Some((ptr, data)) = block_cache { - return Ok(Some((data, ptr))); + // Use herd cache to avoid thundering herd when multiple callers + // request the same ancestor block simultaneously. The cache check + // is inside the future so that only one caller checks and populates + // the cache. + let hash = crypto_stable_hash(&(&block_ptr, offset, &root)); + let this = self.cheap_clone(); + let lookup_fut = async move { + let res: Result, StoreError> = async { + // Check the local cache first. + let block_cache = this + .recent_blocks_cache + .get_ancestor(&block_ptr, offset) + .and_then(|x| Some(x.0).zip(x.1)); + if let Some((ptr, data)) = block_cache { + return Ok(Some((data, ptr))); + } + + // Cache miss, query the database + let mut conn = this.pool.get_permitted().await?; + let result = this + .storage + .ancestor_block(&mut conn, block_ptr, offset, root) + .await + .map_err(StoreError::from)?; + + // Insert into cache if we got a result + if let Some((ref data, ref ptr)) = result { + // Extract parent_hash from data["block"]["parentHash"] or + // data["parentHash"] + if let Some(parent_hash) = data + .get("block") + .unwrap_or(data) + .get("parentHash") + .and_then(|h| h.as_str()) + .and_then(|h| h.parse().ok()) + { + let block = JsonBlock::new(ptr.clone(), parent_hash, Some(data.clone())); + this.recent_blocks_cache.insert_block(block); + } + } + + Ok(result) + } + .await; + BlocksLookupResult::Ancestor(Arc::new(res)) + }; + let lookup_herd = self.lookup_herd.cheap_clone(); + let logger = self.logger.cheap_clone(); + let (res, cached) = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { + (BlocksLookupResult::Ancestor(res), cached) => (res, cached), + _ => { + return Err(anyhow!( + "Unexpected BlocksLookupResult variant returned from cached ancestor block lookup" + )) + } + }; + let result = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone())?; + + if cached { + // If we had a hit in the herd cache, we never ran lookup_fut + // but we want to pretend that we actually looked the value up + // from the recent blocks cache + self.recent_blocks_cache.register_hit(); } - let mut conn = self.pool.get().await?; - self.storage - .ancestor_block(&mut conn, block_ptr, offset, root) - .await + Ok(result) } async fn cleanup_cached_blocks( @@ -2681,7 +2735,7 @@ impl ChainStoreTrait for ChainStore { // // See 8b6ad0c64e244023ac20ced7897fe666 - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let query = " select coalesce( least(a.block, @@ -2727,7 +2781,7 @@ impl ChainStoreTrait for ChainStore { &self, number: BlockNumber, ) -> Result, Error> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage .block_hashes_by_block_number(&mut conn, &self.chain, number) .await @@ -2738,7 +2792,7 @@ impl ChainStoreTrait for ChainStore { number: BlockNumber, hash: &BlockHash, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage .confirm_block_hash(&mut conn, &self.chain, number, hash) .await @@ -2748,7 +2802,7 @@ impl ChainStoreTrait for ChainStore { &self, hash: &BlockHash, ) -> Result, Option)>, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage.block_number(&mut conn, hash).await.map(|opt| { opt.map(|(number, timestamp, parent_hash)| { (self.chain.clone(), number, timestamp, parent_hash) @@ -2764,14 +2818,14 @@ impl ChainStoreTrait for ChainStore { return Ok(HashMap::new()); } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage .block_numbers(&mut conn, hashes.as_slice()) .await } async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; if let Some(head) = self.chain_head_block(&self.chain).await? { self.storage .clear_call_cache(&mut conn, head, from, to) @@ -2785,7 +2839,7 @@ impl ChainStoreTrait for ChainStore { ttl_days: i32, ttl_max_contracts: Option, ) -> Result<(), Error> { - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; self.storage .clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts) .await @@ -2795,7 +2849,7 @@ impl ChainStoreTrait for ChainStore { &self, block_hash: &H256, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.storage .find_transaction_receipts_in_block(&mut conn, *block_hash) .await @@ -2803,7 +2857,7 @@ impl ChainStoreTrait for ChainStore { } async fn chain_identifier(&self) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; use public::ethereum_networks as n; let (genesis_block_hash, net_version) = n::table .select((n::genesis_block_hash, n::net_version)) @@ -2931,6 +2985,11 @@ mod recent_blocks_cache { } } + pub fn register_hit(&self) { + let inner = self.inner.read(); + inner.metrics.record_cache_hit(&inner.network); + } + pub fn clear(&self) { self.inner.write().blocks.clear(); self.inner.read().update_write_metrics(); @@ -3045,7 +3104,7 @@ impl EthereumCallCache for ChainStore { block: BlockPtr, ) -> Result, Error> { let id = contract_call_id(req, &block); - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; let return_value = conn .transaction::<_, Error, _>(|conn| { async { @@ -3086,7 +3145,7 @@ impl EthereumCallCache for ChainStore { .collect(); let id_refs: Vec<_> = ids.iter().map(|id| id.as_slice()).collect(); - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; let rows = conn .transaction::<_, Error, _>(|conn| { self.storage @@ -3120,7 +3179,7 @@ impl EthereumCallCache for ChainStore { } async fn get_calls_in_block(&self, block: BlockPtr) -> Result, Error> { - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; conn.transaction::<_, Error, _>(|conn| { self.storage.get_calls_in_block(conn, block).scope_boxed() }) @@ -3149,7 +3208,7 @@ impl EthereumCallCache for ChainStore { }; let id = contract_call_id(&call, &block); - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; conn.transaction::<_, anyhow::Error, _>(|conn| { self.storage .set_call( diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 71166f3a6f1..97de0baca24 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -189,7 +189,7 @@ impl DeploymentStore { on_sync: OnSync, index_def: Option, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction::<_, StoreError, _>(|conn| { async { let exists = deployment::exists(conn, &site).await?; @@ -259,7 +259,7 @@ impl DeploymentStore { &self, site: Arc, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site.clone()).await?; Ok( detail::deployment_entity(&mut conn, &site, &layout.input_schema) @@ -271,7 +271,7 @@ impl DeploymentStore { // Remove the data and metadata for the deployment `site`. This operation // is not reversible pub(crate) async fn drop_deployment(&self, site: &Site) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { async { crate::deployment::drop_schema(conn, &site.namespace).await?; @@ -388,16 +388,19 @@ impl DeploymentStore { } /// Panics if `idx` is not a valid index for a read only pool. - async fn read_only_conn(&self, idx: usize) -> Result { - self.read_only_pools[idx].get().await.map_err(Error::from) + async fn read_only_conn(&self, idx: usize) -> Result { + self.read_only_pools[idx] + .get_permitted() + .await + .map_err(Error::from) } pub(crate) async fn get_replica_conn( &self, replica: ReplicaId, - ) -> Result { + ) -> Result { let conn = match replica { - ReplicaId::Main => self.pool.get().await?, + ReplicaId::Main => self.pool.get_permitted().await?, ReplicaId::ReadOnly(idx) => self.read_only_conn(idx).await?, }; Ok(conn) @@ -440,7 +443,7 @@ impl DeploymentStore { return Ok(layout); } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.layout(&mut conn, site).await } @@ -504,7 +507,7 @@ impl DeploymentStore { return Ok(info.clone()); } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.subgraph_info_with_conn(&mut conn, site).await } @@ -519,7 +522,7 @@ impl DeploymentStore { &self, ids: Vec, ) -> Result, StoreError> { - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; detail::deployment_details(conn, ids).await } @@ -528,7 +531,7 @@ impl DeploymentStore { locator: &DeploymentLocator, ) -> Result { let id = DeploymentId::from(locator.clone()); - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; detail::deployment_details_for_id(conn, &id).await } @@ -536,7 +539,7 @@ impl DeploymentStore { &self, sites: &[Arc], ) -> Result, StoreError> { - let conn = &mut self.pool.get().await?; + let conn = &mut self.pool.get_permitted().await?; detail::deployment_statuses(conn, sites).await } @@ -544,7 +547,7 @@ impl DeploymentStore { &self, id: &DeploymentHash, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::exists_and_synced(&mut conn, id.as_str()).await } @@ -553,14 +556,14 @@ impl DeploymentStore { id: &DeploymentHash, block_ptr: BlockPtr, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| deployment::set_synced(conn, id, block_ptr).scope_boxed()) .await } /// Look up the on_sync action for this deployment pub(crate) async fn on_sync(&self, site: &Site) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::on_sync(&mut conn, site.id).await } @@ -570,7 +573,7 @@ impl DeploymentStore { &self, site: &Site, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; crate::copy::source(&mut conn, site).await } @@ -580,7 +583,7 @@ impl DeploymentStore { &self, namespace: &crate::primary::Namespace, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::drop_schema(&mut conn, namespace).await } @@ -602,7 +605,7 @@ impl DeploymentStore { delete from active_copies; "; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.batch_execute(QUERY).await?; conn.batch_execute("delete from deployment_schemas;") .await?; @@ -610,7 +613,7 @@ impl DeploymentStore { } pub(crate) async fn vacuum(&self) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.batch_execute("vacuum (analyze) subgraphs.head, subgraphs.deployment") .await?; Ok(()) @@ -622,7 +625,7 @@ impl DeploymentStore { site: Arc, entity: Option<&str>, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site).await?; let tables = entity .map(|entity| resolve_table_name(&layout, entity)) @@ -639,7 +642,7 @@ impl DeploymentStore { &self, site: Arc, ) -> Result<(i32, BTreeMap>), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let default = catalog::default_stats_target(&mut conn).await?; let targets = catalog::stats_targets(&mut conn, &site.namespace).await?; @@ -653,7 +656,7 @@ impl DeploymentStore { columns: Vec, target: i32, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site.clone()).await?; let tables = entity @@ -704,7 +707,7 @@ impl DeploymentStore { ) -> Result<(), StoreError> { let store = self.clone(); let entity_name = entity_name.to_owned(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let schema_name = site.namespace.clone(); let layout = store.layout(&mut conn, site).await?; let (index_name, sql) = @@ -735,7 +738,7 @@ impl DeploymentStore { ) -> Result, StoreError> { let store = self.clone(); let entity_name = entity_name.to_owned(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let schema_name = site.namespace.clone(); let layout = store.layout(&mut conn, site).await?; let table = resolve_table_name(&layout, &entity_name)?; @@ -747,9 +750,11 @@ impl DeploymentStore { Ok(indexes.into_iter().map(CreateIndex::parse).collect()) } + /// Do not use this while already holding a connection as that can lead + /// to deadlocks pub(crate) async fn load_indexes(&self, site: Arc) -> Result { let store = self.clone(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; IndexList::load(&mut conn, site, store).await } @@ -760,7 +765,7 @@ impl DeploymentStore { index_name: &str, ) -> Result<(), StoreError> { let index_name = String::from(index_name); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let schema_name = site.namespace.clone(); catalog::drop_index(&mut conn, schema_name.as_str(), &index_name).await } @@ -773,7 +778,7 @@ impl DeploymentStore { ) -> Result<(), StoreError> { let store = self.clone(); let table = table.to_string(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = store.layout(&mut conn, site.clone()).await?; let table = resolve_table_name(&layout, &table)?; catalog::set_account_like(&mut conn, &site, &table.name, is_account_like).await @@ -798,7 +803,7 @@ impl DeploymentStore { // will use the updated value self.layout_cache.remove(site); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::set_history_blocks(&mut conn, site, history_blocks).await } @@ -842,7 +847,7 @@ impl DeploymentStore { } let store = self.clone(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; // We lock pruning for this deployment to make sure that if the // deployment is reassigned to another node, that node won't // kick off a pruning run while this node might still be pruning @@ -859,7 +864,7 @@ impl DeploymentStore { self: &Arc, site: Arc, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site.clone()).await?; Ok(relational::prune::Viewer::new(self.pool.clone(), layout)) @@ -871,14 +876,14 @@ impl DeploymentStore { pub(crate) async fn block_ptr(&self, site: Arc) -> Result, StoreError> { let site = site.cheap_clone(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; Self::block_ptr_with_conn(&mut conn, site).await } pub(crate) async fn block_cursor(&self, site: Arc) -> Result { let site = site.cheap_clone(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::get_subgraph_firehose_cursor(&mut conn, site) .await .map(FirehoseCursor::from) @@ -890,7 +895,7 @@ impl DeploymentStore { ) -> Result, StoreError> { let store = self.cheap_clone(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = store.layout(&mut conn, site.cheap_clone()).await?; layout.last_rollup(&mut conn).await } @@ -908,7 +913,7 @@ impl DeploymentStore { let info = self.subgraph_info(site.cheap_clone()).await?; let poi_digest = layout.input_schema.poi_digest(); - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let entities: Option<(Vec, BlockPtr)> = { let site = site.clone(); @@ -994,7 +999,7 @@ impl DeploymentStore { key: &EntityKey, block: BlockNumber, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site).await?; layout.find(&mut conn, key, block).await } @@ -1010,7 +1015,7 @@ impl DeploymentStore { if ids_for_type.is_empty() { return Ok(BTreeMap::new()); } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site).await?; layout.find_many(&mut conn, ids_for_type, block).await @@ -1023,7 +1028,7 @@ impl DeploymentStore { causality_region: CausalityRegion, block_range: Range, ) -> Result>, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site).await?; layout .find_range(&mut conn, entity_types, causality_region, block_range) @@ -1037,7 +1042,7 @@ impl DeploymentStore { block: BlockNumber, excluded_keys: &Vec, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site).await?; layout .find_derived(&mut conn, derived_query, block, excluded_keys) @@ -1049,7 +1054,7 @@ impl DeploymentStore { site: Arc, block: BlockNumber, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let layout = self.layout(&mut conn, site).await?; let changes = layout.find_changes(&mut conn, block).await?; @@ -1063,7 +1068,7 @@ impl DeploymentStore { site: Arc, query: EntityQuery, ) -> Result, QueryExecutionError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; self.execute_query(&mut conn, site, query) .await .map(|(entities, _)| entities) @@ -1080,7 +1085,7 @@ impl DeploymentStore { ) -> Result<(), StoreError> { let mut conn = { let _section = stopwatch.start_section("transact_blocks_get_conn"); - self.pool.get().await? + self.pool.get_permitted().await? }; let (layout, earliest_block) = deployment::with_lock(&mut conn, &site, async |conn| { @@ -1315,7 +1320,7 @@ impl DeploymentStore { site: Arc, block_ptr_to: BlockPtr, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let block_ptr_from = Self::block_ptr_with_conn(&mut conn, site.cheap_clone()).await?; @@ -1346,7 +1351,7 @@ impl DeploymentStore { site: Arc, block_ptr_to: BlockPtr, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let block_ptr_from = Self::block_ptr_with_conn(&mut conn, site.cheap_clone()).await?; @@ -1378,7 +1383,7 @@ impl DeploymentStore { block_ptr_to: BlockPtr, firehose_cursor: &FirehoseCursor, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; // Unwrap: If we are reverting then the block ptr is not `None`. let deployment_head = Self::block_ptr_with_conn(&mut conn, site.cheap_clone()) .await? @@ -1414,7 +1419,7 @@ impl DeploymentStore { &self, site: Arc, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::state(&mut conn, &site).await } @@ -1423,7 +1428,7 @@ impl DeploymentStore { id: DeploymentHash, error: SubgraphError, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| deployment::fail(conn, &id, &error).scope_boxed()) .await } @@ -1449,7 +1454,7 @@ impl DeploymentStore { block: BlockNumber, manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; crate::dynds::load(&mut conn, &site, block, manifest_idx_and_name).await } @@ -1457,12 +1462,12 @@ impl DeploymentStore { &self, site: Arc, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; crate::dynds::causality_region_curr_val(&mut conn, &site).await } pub(crate) async fn exists_and_synced(&self, id: DeploymentHash) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::exists_and_synced(&mut conn, &id).await } @@ -1470,7 +1475,7 @@ impl DeploymentStore { &self, id: &DeploymentHash, ) -> Result, StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::graft_pending(&mut conn, id).await } @@ -1529,7 +1534,7 @@ impl DeploymentStore { return Err(StoreError::Canceled); } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; conn.transaction::<(), StoreError, _>(|conn| { async { // Copy shared dynamic data sources and adjust their ID; if @@ -1603,10 +1608,11 @@ impl DeploymentStore { .await?; } - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; if ENV_VARS.postpone_attribute_index_creation { - // check if all indexes are valid and recreate them if they aren't - self.load_indexes(site.clone()) + // Check if all indexes are valid and recreate them if they + // aren't. + IndexList::load(&mut conn, site, self.cheap_clone()) .await? .recreate_invalid_indexes(&mut conn, &dst) .await?; @@ -1638,7 +1644,7 @@ impl DeploymentStore { current_ptr: &BlockPtr, parent_ptr: &BlockPtr, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let deployment_id = &site.deployment; conn.transaction(|conn| { @@ -1735,7 +1741,7 @@ impl DeploymentStore { site: Arc, current_ptr: &BlockPtr, ) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let deployment_id = &site.deployment; conn.transaction(|conn| async { @@ -1798,7 +1804,7 @@ impl DeploymentStore { #[cfg(debug_assertions)] pub async fn error_count(&self, id: &DeploymentHash) -> Result { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::error_count(&mut conn, id).await } @@ -1820,7 +1826,7 @@ impl DeploymentStore { "info.subgraph_sizes", "info.chain_sizes", ]; - let mut conn = store.pool.get().await?; + let mut conn = store.pool.get_permitted().await?; for view in VIEWS { let query = format!("refresh materialized view {}", view); diesel::sql_query(&query).execute(&mut conn).await?; @@ -1840,7 +1846,7 @@ impl DeploymentStore { site: &Site, ) -> Result { let id = site.id; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::health(&mut conn, id).await } @@ -1849,7 +1855,7 @@ impl DeploymentStore { site: Arc, raw_yaml: String, ) -> Result<(), StoreError> { - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; deployment::set_manifest_raw_yaml(&mut conn, &site, &raw_yaml).await } diff --git a/store/postgres/src/graphman/mod.rs b/store/postgres/src/graphman/mod.rs index 30fcd47fb58..0c856c0e75b 100644 --- a/store/postgres/src/graphman/mod.rs +++ b/store/postgres/src/graphman/mod.rs @@ -28,7 +28,7 @@ impl GraphmanStore { #[async_trait] impl graphman_store::GraphmanStore for GraphmanStore { async fn new_execution(&self, kind: CommandKind) -> Result { - let mut conn = self.primary_pool.get().await?; + let mut conn = self.primary_pool.get_permitted().await?; let id: i64 = diesel::insert_into(gce::table) .values(( @@ -44,14 +44,14 @@ impl graphman_store::GraphmanStore for GraphmanStore { } async fn load_execution(&self, id: ExecutionId) -> Result { - let mut conn = self.primary_pool.get().await?; + let mut conn = self.primary_pool.get_permitted().await?; let execution = gce::table.find(id).first(&mut conn).await?; Ok(execution) } async fn mark_execution_as_running(&self, id: ExecutionId) -> Result<()> { - let mut conn = self.primary_pool.get().await?; + let mut conn = self.primary_pool.get_permitted().await?; diesel::update(gce::table) .set(( @@ -67,7 +67,7 @@ impl graphman_store::GraphmanStore for GraphmanStore { } async fn mark_execution_as_failed(&self, id: ExecutionId, error_message: String) -> Result<()> { - let mut conn = self.primary_pool.get().await?; + let mut conn = self.primary_pool.get_permitted().await?; diesel::update(gce::table) .set(( @@ -83,7 +83,7 @@ impl graphman_store::GraphmanStore for GraphmanStore { } async fn mark_execution_as_succeeded(&self, id: ExecutionId) -> Result<()> { - let mut conn = self.primary_pool.get().await?; + let mut conn = self.primary_pool.get_permitted().await?; diesel::update(gce::table) .set(( diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 6cff0f3c844..fdca61d2ca6 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -23,7 +23,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::RwLock; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::pool::AsyncPool; @@ -241,8 +241,13 @@ pub(crate) fn spawn_size_stat_collector( /// Reap connections that are too old (older than 30 minutes) or if there /// are more than `connection_min_idle` connections in the pool that have /// been idle for longer than `idle_timeout` -pub(crate) fn spawn_connection_reaper(pool: AsyncPool, idle_timeout: Duration) { +pub(crate) fn spawn_connection_reaper( + pool: AsyncPool, + idle_timeout: Duration, + wait_gauge: Option, +) { const MAX_LIFETIME: Duration = Duration::from_secs(30 * 60); + const CHECK_INTERVAL: Duration = Duration::from_secs(30); let Some(min_idle) = ENV_VARS.store.connection_min_idle else { // If this is None, we will never reap anything return; @@ -254,7 +259,9 @@ pub(crate) fn spawn_connection_reaper(pool: AsyncPool, idle_timeout: Duration) { tokio::task::spawn(async move { loop { let mut idle_count = 0; + let mut last_used = Instant::now() - 2 * CHECK_INTERVAL; pool.retain(|_, metrics| { + last_used = last_used.max(metrics.recycled.unwrap_or(metrics.created)); if metrics.age() > MAX_LIFETIME { return false; } @@ -264,13 +271,18 @@ pub(crate) fn spawn_connection_reaper(pool: AsyncPool, idle_timeout: Duration) { } true }); - tokio::time::sleep(Duration::from_secs(30)).await; + if last_used.elapsed() > CHECK_INTERVAL { + // Reset wait time if there was no activity recently so that + // we don't report stale wait times + wait_gauge.as_ref().map(|wait_gauge| wait_gauge.set(0.0)); + } + tokio::time::sleep(CHECK_INTERVAL).await; } }); } pub(crate) struct WaitMeter { - wait_gauge: Gauge, + pub(crate) wait_gauge: Gauge, pub(crate) wait_stats: PoolWaitStats, } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index b7d3bda1db9..cd44b32463e 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -17,8 +17,10 @@ use graph::prelude::{ use graph::prelude::{tokio, MetricsRegistry}; use graph::slog::warn; use graph::util::timed_rw_lock::TimedMutex; +use tokio::sync::OwnedSemaphorePermit; use std::fmt::{self}; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; use std::{collections::HashMap, sync::RwLock}; @@ -42,6 +44,27 @@ type AsyncPool = deadpool::managed::Pool; /// A database connection for asynchronous diesel operations pub type AsyncPgConnection = deadpool::managed::Object; +/// A database connection bundled with a semaphore permit. +/// The permit is held for the lifetime of the connection, providing +/// backpressure to prevent pool exhaustion during mass operations. +pub struct PermittedConnection { + conn: AsyncPgConnection, + _permit: OwnedSemaphorePermit, +} + +impl Deref for PermittedConnection { + type Target = AsyncPgConnection; + fn deref(&self) -> &Self::Target { + &self.conn + } +} + +impl DerefMut for PermittedConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.conn + } +} + /// The namespace under which the `PRIMARY_TABLES` are mapped into each /// shard pub(crate) const PRIMARY_PUBLIC: &'static str = "primary_public"; @@ -319,6 +342,14 @@ impl ConnectionPool { self.get_ready()?.get().await } + /// Get a connection with backpressure via semaphore permit. Use this + /// for indexing operations to prevent pool exhaustion. This method will + /// wait indefinitely until a permit, and with that, a connection is + /// available. + pub async fn get_permitted(&self) -> Result { + self.get_ready()?.get_permitted().await + } + /// Get a connection from the pool for foreign data wrapper access; /// since that pool can be very contended, periodically log that we are /// still waiting for a connection @@ -404,6 +435,15 @@ pub struct PoolInner { query_semaphore: Arc, semaphore_wait_stats: Arc>, semaphore_wait_gauge: Box, + + // Limits concurrent indexing operations to prevent pool exhaustion + // during mass subgraph startup or high write load. Provides + // backpressure similar to the old `with_conn` limiter that was removed + // during diesel-async migration. It also avoids timeouts because of + // pool exhaustion when getting a connection. + indexing_semaphore: Arc, + indexing_semaphore_wait_stats: Arc>, + indexing_semaphore_wait_gauge: Box, } impl PoolInner { @@ -458,10 +498,14 @@ impl PoolInner { manager::spawn_size_stat_collector(pool.clone(), ®istry, const_labels.clone()); - manager::spawn_connection_reaper(pool.clone(), ENV_VARS.store.connection_idle_timeout); - let wait_meter = WaitMeter::new(®istry, const_labels.clone()); + manager::spawn_connection_reaper( + pool.clone(), + ENV_VARS.store.connection_idle_timeout, + Some(wait_meter.wait_gauge.clone()), + ); + let fdw_pool = fdw_pool_size.map(|pool_size| { let fdw_timeouts = Timeouts { wait: Some(ENV_VARS.store.connection_timeout), @@ -478,21 +522,34 @@ impl PoolInner { .build() .expect("failed to create fdw connection pool"); - manager::spawn_connection_reaper(fdw_pool.clone(), FDW_IDLE_TIMEOUT); + manager::spawn_connection_reaper(fdw_pool.clone(), FDW_IDLE_TIMEOUT, None); fdw_pool }); info!(logger_store, "Pool successfully connected to Postgres"); + let max_concurrent_queries = pool_size as usize + ENV_VARS.store.extra_query_permits; + + // Query semaphore for GraphQL queries let semaphore_wait_gauge = registry .new_gauge( "query_semaphore_wait_ms", "Moving average of time spent on waiting for postgres query semaphore", - const_labels, + const_labels.clone(), ) .expect("failed to create `query_effort_ms` counter"); - let max_concurrent_queries = pool_size as usize + ENV_VARS.store.extra_query_permits; let query_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_queries)); + + // Semaphore for getting PermittedConnection + let indexing_semaphore_wait_gauge = registry + .new_gauge( + "store_semaphore_wait_ms", + "Moving average of time spent waiting for connection semaphore", + const_labels, + ) + .expect("failed to create store_semaphore_wait_ms gauge"); + let indexing_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_queries)); + PoolInner { logger: logger_pool, shard, @@ -504,6 +561,9 @@ impl PoolInner { semaphore_wait_stats: Arc::new(RwLock::new(MovingStats::default())), query_semaphore, semaphore_wait_gauge, + indexing_semaphore, + indexing_semaphore_wait_stats: Arc::new(RwLock::new(MovingStats::default())), + indexing_semaphore_wait_gauge, } } @@ -512,6 +572,10 @@ impl PoolInner { /// /// If `timeouts` is `None`, the default pool timeouts are used. /// + /// The `prev_wait` duration is the time already spent waiting for a + /// permit to get a connection; that time is added to the total wait + /// time recorded. + /// /// On error, returns `StoreError::DatabaseUnavailable` and marks the /// pool as unavailable if we can tell that the error is due to the pool /// being closed. Returns `StoreError::StatementTimeout` if the error is @@ -520,13 +584,14 @@ impl PoolInner { &self, pool: &AsyncPool, timeouts: Option, + prev_wait: Duration, ) -> Result { let start = Instant::now(); let res = match timeouts { Some(timeouts) => pool.timeout_get(&timeouts).await, None => pool.get().await, }; - let elapsed = start.elapsed(); + let elapsed = start.elapsed() + prev_wait; self.wait_meter.add_conn_wait_time(elapsed); match res { Ok(conn) => { @@ -551,7 +616,7 @@ impl PoolInner { } async fn get(&self) -> Result { - self.get_from_pool(&self.pool, None).await + self.get_from_pool(&self.pool, None, Duration::ZERO).await } /// Get the pool for fdw connections. It is an error if none is configured @@ -585,7 +650,7 @@ impl PoolInner { { let pool = self.fdw_pool(logger)?; loop { - match self.get_from_pool(&pool, None).await { + match self.get_from_pool(&pool, None, Duration::ZERO).await { Ok(conn) => return Ok(conn), Err(e) => { if timeout() { @@ -612,7 +677,10 @@ impl PoolInner { create: None, recycle: None, }; - let Ok(conn) = self.get_from_pool(fdw_pool, Some(timeouts)).await else { + let Ok(conn) = self + .get_from_pool(fdw_pool, Some(timeouts), Duration::ZERO) + .await + else { return None; }; Some(conn) @@ -649,7 +717,7 @@ impl PoolInner { ) } - pub(crate) async fn query_permit(&self) -> tokio::sync::OwnedSemaphorePermit { + pub(crate) async fn query_permit(&self) -> OwnedSemaphorePermit { let start = Instant::now(); let permit = self.query_semaphore.cheap_clone().acquire_owned().await; self.semaphore_wait_stats @@ -659,6 +727,33 @@ impl PoolInner { permit.unwrap() } + /// Acquire a permit for indexing operations. This provides backpressure + /// to prevent connection pool exhaustion during mass subgraph startup + /// or high write load. + async fn indexing_permit(&self) -> (OwnedSemaphorePermit, Duration) { + let start = Instant::now(); + let permit = self.indexing_semaphore.cheap_clone().acquire_owned().await; + let elapsed = start.elapsed(); + self.indexing_semaphore_wait_stats + .write() + .unwrap() + .add_and_register(elapsed, &self.indexing_semaphore_wait_gauge); + (permit.unwrap(), elapsed) + } + + /// Get a connection with backpressure via semaphore permit. Use this + /// for indexing operations to prevent pool exhaustion. This method will + /// wait indefinitely until a permit, and with that, a connection is + /// available. + pub(crate) async fn get_permitted(&self) -> Result { + let (permit, permit_wait) = self.indexing_permit().await; + let conn = self.get_from_pool(&self.pool, None, permit_wait).await?; + Ok(PermittedConnection { + conn, + _permit: permit, + }) + } + async fn configure_fdw(&self, servers: &[ForeignServer]) -> Result<(), StoreError> { info!(&self.logger, "Setting up fdw"); let mut conn = self.get().await?; diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 59e9fb4ec3d..5a1840b39e3 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -4,7 +4,7 @@ use crate::{ block_range::UNVERSIONED_RANGE, detail::DeploymentDetail, - pool::PRIMARY_PUBLIC, + pool::{PermittedConnection, PRIMARY_PUBLIC}, subgraph_store::{unused, Shard, PRIMARY_SHARD}, AsyncPgConnection, ConnectionPool, ForeignServer, NotificationSender, }; @@ -779,11 +779,11 @@ mod queries { /// A wrapper for a database connection that provides access to functionality /// that works only on the primary database pub struct Connection { - conn: AsyncPgConnection, + conn: PermittedConnection, } impl Connection { - pub fn new(conn: AsyncPgConnection) -> Self { + pub fn new(conn: PermittedConnection) -> Self { Self { conn } } @@ -807,13 +807,13 @@ impl Connection { type TM = ::TransactionManager; async move { - TM::begin_transaction(&mut self.conn).await?; + TM::begin_transaction(&mut *self.conn).await?; match callback(self).await { Ok(value) => { - TM::commit_transaction(&mut self.conn).await?; + TM::commit_transaction(&mut *self.conn).await?; Ok(value) } - Err(user_error) => match TM::rollback_transaction(&mut self.conn).await { + Err(user_error) => match TM::rollback_transaction(&mut *self.conn).await { Ok(()) => Err(user_error), Err(diesel::result::Error::BrokenTransactionManager) => { // In this case we are probably more interested by the @@ -1991,7 +1991,7 @@ impl Primary { pub async fn is_source(&self, site: &Site) -> Result { use active_copies as ac; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; select(diesel::dsl::exists( ac::table @@ -2006,7 +2006,7 @@ impl Primary { pub async fn is_copy_cancelled(&self, dst: &Site) -> Result { use active_copies as ac; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; ac::table .filter(ac::dst.eq(dst.id)) @@ -2098,7 +2098,7 @@ impl Mirror { { async move { for pool in self.pools.as_ref() { - let mut conn = match pool.get().await { + let mut conn = match pool.get_permitted().await { Ok(conn) => conn, Err(StoreError::DatabaseUnavailable) => continue, Err(e) => return Err(e), diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index bf23245013d..7b9bc0b8e41 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -1010,7 +1010,7 @@ mod status { pub async fn runs(&self) -> StoreResult> { use prune_state as ps; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let runs = ps::table .filter(ps::id.eq(self.layout.site.id)) .select(ps::run) @@ -1029,7 +1029,7 @@ mod status { use prune_state as ps; use prune_table_state as pts; - let mut conn = self.pool.get().await?; + let mut conn = self.pool.get_permitted().await?; let ptss = pts::table .filter(pts::id.eq(self.layout.site.id)) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 396aee2e595..9cbb06f34b1 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -819,7 +819,7 @@ impl Inner { /// of connections in between getting the first one and trying to get the /// second one. pub(crate) async fn primary_conn(&self) -> Result { - let conn = self.mirror.primary().get().await?; + let conn = self.mirror.primary().get_permitted().await?; Ok(primary::Connection::new(conn)) } @@ -1381,7 +1381,7 @@ impl EnsLookup { } async fn is_table_empty(pool: &ConnectionPool) -> Result { - let conn = pool.get().await?; + let conn = pool.get_permitted().await?; primary::Connection::new(conn).is_ens_table_empty().await } } @@ -1389,7 +1389,7 @@ impl EnsLookup { #[async_trait] impl EnsLookupTrait for EnsLookup { async fn find_name(&self, hash: &str) -> Result, StoreError> { - let conn = self.primary.get().await?; + let conn = self.primary.get_permitted().await?; primary::Connection::new(conn).find_ens_name(hash).await } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 74b8c99b6c8..39ef086e61d 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -638,6 +638,72 @@ impl BlockTracker { } } +/// A batch that is queued for writing. The read methods on the `Queue` like +/// `get` and `get_many` need to be able to read from a batch until the +/// database changes that the batch contains have been committed. At the +/// same time, once the background writer starts processing a batch, it must +/// not be modified. This enum makes it possible to do this without cloning +/// the batch or holding a lock across an await point. +/// +/// When a batch is first added to the queue, it can still be appended to +/// (`Open`). Once the background writer starts processing the batch, it is +/// closed (`Closed`) and can no longer be modified. +/// +/// The `processed` flag in the `Request` is a shortcut to determine whether +/// a batch can still be appended to +enum QueuedBatch { + /// An open batch that can still be appended to + Open(Batch), + /// A closed batch that can no longer be modified + Closed(Arc), + /// Temporary placeholder during state transitions. Must never be + /// observed outside of `QueuedBatch::close`. + Invalid, +} + +impl Deref for QueuedBatch { + type Target = Batch; + + fn deref(&self) -> &Self::Target { + match self { + QueuedBatch::Open(batch) => batch, + QueuedBatch::Closed(batch) => batch.as_ref(), + QueuedBatch::Invalid => unreachable!("deref is never called on a QueuedBatch::Invalid"), + } + } +} + +impl QueuedBatch { + /// Append another batch to this one. Returns an error if this batch is + /// closed + fn append(&mut self, other: Batch) -> Result<(), StoreError> { + match self { + QueuedBatch::Open(batch) => batch.append(other), + QueuedBatch::Closed(_) => Err(internal_error!("attempt to append to closed batch")), + QueuedBatch::Invalid => { + unreachable!("append is never called on a QueuedBatch::Invalid") + } + } + } + + /// Close the current batch, i.e., replace it by a `Closed` batch and + /// return a clone of the `Arc` that the closed batch now holds + fn close(&mut self) -> Arc { + let old = std::mem::replace(self, QueuedBatch::Invalid); + *self = match old { + QueuedBatch::Open(batch) => QueuedBatch::Closed(Arc::new(batch)), + closed @ QueuedBatch::Closed(_) => closed, + QueuedBatch::Invalid => unreachable!("close is never called on a QueuedBatch::Invalid"), + }; + match self { + QueuedBatch::Closed(batch) => batch.cheap_clone(), + QueuedBatch::Open(_) | QueuedBatch::Invalid => { + unreachable!("close must have set self to Closed") + } + } + } +} + /// A write request received from the `WritableStore` frontend that gets /// queued /// @@ -654,7 +720,12 @@ enum Request { // will try to read the batch. The batch only becomes truly readonly // when we decide to process it at which point we set `processed` to // `true` - batch: RwLock, + batch: RwLock, + /// True if the background writer has started processing this + /// request. It is guaranteed that once the batch is a + /// `QueuedBatch::Closed`, this flag is true. This flag serves as a + /// shortcut to check that without having to acquire the lock around + /// the batch. processed: AtomicBool, }, RevertTo { @@ -699,7 +770,7 @@ impl Request { queued: Instant::now(), store, stopwatch, - batch: RwLock::new(batch), + batch: RwLock::new(QueuedBatch::Open(batch)), processed: AtomicBool::new(false), } } @@ -741,7 +812,9 @@ impl Request { processed: _, } => { let start = Instant::now(); - let batch = batch.read().unwrap(); + + let batch = batch.write().unwrap().close(); + if let Some(err) = &batch.error { // This can happen when appending to the batch failed // because of an internal error. Returning an `Err` here @@ -749,7 +822,7 @@ impl Request { return Err(err.clone()); } let res = store - .transact_block_operations(batch.deref(), stopwatch) + .transact_block_operations(&batch, stopwatch) .await .map(|()| ExecResult::Continue); info!(store.logger, "Committed write batch"; diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index bfcb035456b..a671e770a6f 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -672,7 +672,7 @@ fn build_store() -> (Arc, ConnectionPool, Config, Arc graph_store_postgres::layout_for_tests::Connection { - let conn = PRIMARY_POOL.get().await.unwrap(); + let conn = PRIMARY_POOL.get_permitted().await.unwrap(); graph_store_postgres::layout_for_tests::Connection::new(conn) }