Skip to content

Commit 275ce0c

Browse files
committed
store: Introduce ConnectionPool.get_permitted
This helps us avoid timeouts when we encounter a thundering herd, for example when an index node starts up and all subgraphs want access to the database.
1 parent 82b60ac commit 275ce0c

File tree

9 files changed

+202
-110
lines changed

9 files changed

+202
-110
lines changed

store/postgres/src/block_store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ pub mod primary {
164164
}
165165

166166
pub(super) async fn drop_chain(pool: &ConnectionPool, name: &str) -> Result<(), StoreError> {
167-
let mut conn = pool.get().await?;
167+
let mut conn = pool.get_permitted().await?;
168168

169169
delete(chains::table.filter(chains::name.eq(name)))
170170
.execute(&mut conn)
@@ -427,7 +427,7 @@ impl BlockStore {
427427
let cached = match self.chain_head_cache.get(shard.as_str()) {
428428
Some(cached) => cached,
429429
None => {
430-
let mut conn = match pool.get().await {
430+
let mut conn = match pool.get_permitted().await {
431431
Ok(conn) => conn,
432432
Err(StoreError::DatabaseUnavailable) => continue,
433433
Err(e) => return Err(e),
@@ -568,7 +568,7 @@ impl BlockStore {
568568
use crate::primary::db_version as dbv;
569569

570570
let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap();
571-
let mut conn = primary_pool.get().await?;
571+
let mut conn = primary_pool.get_permitted().await?;
572572
let version: i64 = dbv::table
573573
.select(dbv::version)
574574
.get_result(&mut conn)
@@ -667,7 +667,7 @@ impl ChainIdStore for BlockStore {
667667

668668
// Update the master copy in the primary
669669
let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap();
670-
let mut conn = primary_pool.get().await?;
670+
let mut conn = primary_pool.get_permitted().await?;
671671

672672
diesel::update(c::table.filter(c::name.eq(chain_name.as_str())))
673673
.set((

store/postgres/src/chain_head_listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ impl ChainHeadUpdateSender {
261261
"head_block_number": number
262262
});
263263

264-
let mut conn = self.pool.get().await?;
264+
let mut conn = self.pool.get_permitted().await?;
265265
self.sender
266266
.notify(
267267
&mut conn,

store/postgres/src/chain_store.rs

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,7 +2024,7 @@ impl ChainStore {
20242024
pub(crate) async fn create(&self, ident: &ChainIdentifier) -> Result<(), Error> {
20252025
use public::ethereum_networks::dsl::*;
20262026

2027-
let mut conn = self.pool.get().await?;
2027+
let mut conn = self.pool.get_permitted().await?;
20282028
conn.transaction(|conn| {
20292029
async move {
20302030
insert_into(ethereum_networks)
@@ -2051,7 +2051,7 @@ impl ChainStore {
20512051

20522052
pub async fn update_name(&self, name: &str) -> Result<(), Error> {
20532053
use public::ethereum_networks as n;
2054-
let mut conn = self.pool.get().await?;
2054+
let mut conn = self.pool.get_permitted().await?;
20552055
conn.transaction(|conn| {
20562056
async {
20572057
update(n::table.filter(n::name.eq(&self.chain)))
@@ -2069,7 +2069,7 @@ impl ChainStore {
20692069
use diesel::dsl::delete;
20702070
use public::ethereum_networks as n;
20712071

2072-
let mut conn = self.pool.get().await?;
2072+
let mut conn = self.pool.get_permitted().await?;
20732073
conn.transaction(|conn| {
20742074
async {
20752075
self.storage.drop_storage(conn, &self.chain).await?;
@@ -2111,7 +2111,7 @@ impl ChainStore {
21112111
let number: Option<i64> = n::table
21122112
.filter(n::name.eq(chain))
21132113
.select(n::head_block_number)
2114-
.first::<Option<i64>>(&mut self.pool.get().await?)
2114+
.first::<Option<i64>>(&mut self.pool.get_permitted().await?)
21152115
.await
21162116
.optional()?
21172117
.flatten();
@@ -2131,7 +2131,7 @@ impl ChainStore {
21312131
pub(crate) async fn set_chain_identifier(&self, ident: &ChainIdentifier) -> Result<(), Error> {
21322132
use public::ethereum_networks as n;
21332133

2134-
let mut conn = self.pool.get().await?;
2134+
let mut conn = self.pool.get_permitted().await?;
21352135

21362136
diesel::update(n::table.filter(n::name.eq(&self.chain)))
21372137
.set((
@@ -2195,14 +2195,14 @@ impl ChainStore {
21952195
}
21962196

21972197
pub async fn delete_blocks(&self, block_hashes: &[&H256]) -> Result<usize, Error> {
2198-
let mut conn = self.pool.get().await?;
2198+
let mut conn = self.pool.get_permitted().await?;
21992199
self.storage
22002200
.delete_blocks_by_hash(&mut conn, &self.chain, block_hashes)
22012201
.await
22022202
}
22032203

22042204
pub async fn cleanup_shallow_blocks(&self, lowest_block: i32) -> Result<(), StoreError> {
2205-
let mut conn = self.pool.get().await?;
2205+
let mut conn = self.pool.get_permitted().await?;
22062206
self.storage
22072207
.cleanup_shallow_blocks(&mut conn, lowest_block)
22082208
.await?;
@@ -2211,12 +2211,12 @@ impl ChainStore {
22112211

22122212
// remove_cursor delete the chain_store cursor and return true if it was present
22132213
pub async fn remove_cursor(&self, chain: &str) -> Result<Option<BlockNumber>, StoreError> {
2214-
let mut conn = self.pool.get().await?;
2214+
let mut conn = self.pool.get_permitted().await?;
22152215
self.storage.remove_cursor(&mut conn, chain).await
22162216
}
22172217

22182218
pub async fn truncate_block_cache(&self) -> Result<(), StoreError> {
2219-
let mut conn = self.pool.get().await?;
2219+
let mut conn = self.pool.get_permitted().await?;
22202220
self.storage.truncate_block_cache(&mut conn).await?;
22212221
Ok(())
22222222
}
@@ -2225,7 +2225,7 @@ impl ChainStore {
22252225
self: &Arc<Self>,
22262226
hashes: Vec<BlockHash>,
22272227
) -> Result<Vec<JsonBlock>, StoreError> {
2228-
let mut conn = self.pool.get().await?;
2228+
let mut conn = self.pool.get_permitted().await?;
22292229
let values = self.storage.blocks(&mut conn, &self.chain, &hashes).await?;
22302230
Ok(values)
22312231
}
@@ -2234,7 +2234,7 @@ impl ChainStore {
22342234
self: &Arc<Self>,
22352235
numbers: Vec<BlockNumber>,
22362236
) -> Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError> {
2237-
let mut conn = self.pool.get().await?;
2237+
let mut conn = self.pool.get_permitted().await?;
22382238
let values = self
22392239
.storage
22402240
.block_ptrs_by_numbers(&mut conn, &self.chain, &numbers)
@@ -2261,7 +2261,7 @@ impl ChainStore {
22612261

22622262
let genesis_block_ptr = self.genesis_block_ptr().await?.hash_as_h256();
22632263

2264-
let mut conn = self.pool.get().await?;
2264+
let mut conn = self.pool.get_permitted().await?;
22652265
let candidate = self
22662266
.storage
22672267
.chain_head_candidate(&mut conn, &self.chain)
@@ -2309,8 +2309,8 @@ impl ChainStore {
23092309
/// Helper for tests that need to directly modify the tables for the
23102310
/// chain store
23112311
#[cfg(debug_assertions)]
2312-
pub async fn get_conn_for_test(&self) -> Result<AsyncPgConnection, Error> {
2313-
let conn = self.pool.get().await?;
2312+
pub async fn get_conn_for_test(&self) -> Result<crate::pool::PermittedConnection, Error> {
2313+
let conn = self.pool.get_permitted().await?;
23142314
Ok(conn)
23152315
}
23162316
}
@@ -2336,7 +2336,7 @@ impl ChainHeadStore for ChainStore {
23362336
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
23372337
use public::ethereum_networks::dsl::*;
23382338

2339-
let mut conn = self.pool.get().await?;
2339+
let mut conn = self.pool.get_permitted().await?;
23402340
Ok(ethereum_networks
23412341
.select((head_block_hash, head_block_number))
23422342
.filter(name.eq(&self.chain))
@@ -2369,7 +2369,7 @@ impl ChainHeadStore for ChainStore {
23692369
ethereum_networks
23702370
.select(head_block_cursor)
23712371
.filter(name.eq(&self.chain))
2372-
.load::<Option<String>>(&mut self.pool.get().await?)
2372+
.load::<Option<String>>(&mut self.pool.get_permitted().await?)
23732373
.await
23742374
.map(|rows| {
23752375
rows.as_slice()
@@ -2394,7 +2394,7 @@ impl ChainHeadStore for ChainStore {
23942394
//this will send an update via postgres, channel: chain_head_updates
23952395
self.chain_head_update_sender.send(&hash, number).await?;
23962396

2397-
let mut conn = self.pool.get().await?;
2397+
let mut conn = self.pool.get_permitted().await?;
23982398
conn.transaction(|conn| {
23992399
async {
24002400
self.storage
@@ -2437,7 +2437,7 @@ impl ChainStoreTrait for ChainStore {
24372437
self.recent_blocks_cache.insert_block(block);
24382438
}
24392439

2440-
let mut conn = self.pool.get().await?;
2440+
let mut conn = self.pool.get_permitted().await?;
24412441
conn.transaction(|conn| {
24422442
self.storage
24432443
.upsert_block(conn, &self.chain, block.as_ref(), true)
@@ -2448,7 +2448,7 @@ impl ChainStoreTrait for ChainStore {
24482448
}
24492449

24502450
async fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> {
2451-
let mut conn = self.pool.get().await?;
2451+
let mut conn = self.pool.get_permitted().await?;
24522452
for block in blocks {
24532453
self.storage
24542454
.upsert_block(&mut conn, &self.chain, *block, false)
@@ -2644,7 +2644,7 @@ impl ChainStoreTrait for ChainStore {
26442644
return Ok(Some((data, ptr)));
26452645
}
26462646

2647-
let mut conn = self.pool.get().await?;
2647+
let mut conn = self.pool.get_permitted().await?;
26482648
self.storage
26492649
.ancestor_block(&mut conn, block_ptr, offset, root)
26502650
.await
@@ -2681,7 +2681,7 @@ impl ChainStoreTrait for ChainStore {
26812681
//
26822682
// See 8b6ad0c64e244023ac20ced7897fe666
26832683

2684-
let mut conn = self.pool.get().await?;
2684+
let mut conn = self.pool.get_permitted().await?;
26852685
let query = "
26862686
select coalesce(
26872687
least(a.block,
@@ -2727,7 +2727,7 @@ impl ChainStoreTrait for ChainStore {
27272727
&self,
27282728
number: BlockNumber,
27292729
) -> Result<Vec<BlockHash>, Error> {
2730-
let mut conn = self.pool.get().await?;
2730+
let mut conn = self.pool.get_permitted().await?;
27312731
self.storage
27322732
.block_hashes_by_block_number(&mut conn, &self.chain, number)
27332733
.await
@@ -2738,7 +2738,7 @@ impl ChainStoreTrait for ChainStore {
27382738
number: BlockNumber,
27392739
hash: &BlockHash,
27402740
) -> Result<usize, Error> {
2741-
let mut conn = self.pool.get().await?;
2741+
let mut conn = self.pool.get_permitted().await?;
27422742
self.storage
27432743
.confirm_block_hash(&mut conn, &self.chain, number, hash)
27442744
.await
@@ -2748,7 +2748,7 @@ impl ChainStoreTrait for ChainStore {
27482748
&self,
27492749
hash: &BlockHash,
27502750
) -> Result<Option<(String, BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError> {
2751-
let mut conn = self.pool.get().await?;
2751+
let mut conn = self.pool.get_permitted().await?;
27522752
self.storage.block_number(&mut conn, hash).await.map(|opt| {
27532753
opt.map(|(number, timestamp, parent_hash)| {
27542754
(self.chain.clone(), number, timestamp, parent_hash)
@@ -2764,14 +2764,14 @@ impl ChainStoreTrait for ChainStore {
27642764
return Ok(HashMap::new());
27652765
}
27662766

2767-
let mut conn = self.pool.get().await?;
2767+
let mut conn = self.pool.get_permitted().await?;
27682768
self.storage
27692769
.block_numbers(&mut conn, hashes.as_slice())
27702770
.await
27712771
}
27722772

27732773
async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error> {
2774-
let mut conn = self.pool.get().await?;
2774+
let mut conn = self.pool.get_permitted().await?;
27752775
if let Some(head) = self.chain_head_block(&self.chain).await? {
27762776
self.storage
27772777
.clear_call_cache(&mut conn, head, from, to)
@@ -2785,7 +2785,7 @@ impl ChainStoreTrait for ChainStore {
27852785
ttl_days: i32,
27862786
ttl_max_contracts: Option<i64>,
27872787
) -> Result<(), Error> {
2788-
let conn = &mut self.pool.get().await?;
2788+
let conn = &mut self.pool.get_permitted().await?;
27892789
self.storage
27902790
.clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts)
27912791
.await
@@ -2795,15 +2795,15 @@ impl ChainStoreTrait for ChainStore {
27952795
&self,
27962796
block_hash: &H256,
27972797
) -> Result<Vec<LightTransactionReceipt>, StoreError> {
2798-
let mut conn = self.pool.get().await?;
2798+
let mut conn = self.pool.get_permitted().await?;
27992799
self.storage
28002800
.find_transaction_receipts_in_block(&mut conn, *block_hash)
28012801
.await
28022802
.map_err(StoreError::from)
28032803
}
28042804

28052805
async fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
2806-
let mut conn = self.pool.get().await?;
2806+
let mut conn = self.pool.get_permitted().await?;
28072807
use public::ethereum_networks as n;
28082808
let (genesis_block_hash, net_version) = n::table
28092809
.select((n::genesis_block_hash, n::net_version))
@@ -3045,7 +3045,7 @@ impl EthereumCallCache for ChainStore {
30453045
block: BlockPtr,
30463046
) -> Result<Option<call::Response>, Error> {
30473047
let id = contract_call_id(req, &block);
3048-
let conn = &mut self.pool.get().await?;
3048+
let conn = &mut self.pool.get_permitted().await?;
30493049
let return_value = conn
30503050
.transaction::<_, Error, _>(|conn| {
30513051
async {
@@ -3086,7 +3086,7 @@ impl EthereumCallCache for ChainStore {
30863086
.collect();
30873087
let id_refs: Vec<_> = ids.iter().map(|id| id.as_slice()).collect();
30883088

3089-
let conn = &mut self.pool.get().await?;
3089+
let conn = &mut self.pool.get_permitted().await?;
30903090
let rows = conn
30913091
.transaction::<_, Error, _>(|conn| {
30923092
self.storage
@@ -3120,7 +3120,7 @@ impl EthereumCallCache for ChainStore {
31203120
}
31213121

31223122
async fn get_calls_in_block(&self, block: BlockPtr) -> Result<Vec<CachedEthereumCall>, Error> {
3123-
let conn = &mut self.pool.get().await?;
3123+
let conn = &mut self.pool.get_permitted().await?;
31243124
conn.transaction::<_, Error, _>(|conn| {
31253125
self.storage.get_calls_in_block(conn, block).scope_boxed()
31263126
})
@@ -3149,7 +3149,7 @@ impl EthereumCallCache for ChainStore {
31493149
};
31503150

31513151
let id = contract_call_id(&call, &block);
3152-
let conn = &mut self.pool.get().await?;
3152+
let conn = &mut self.pool.get_permitted().await?;
31533153
conn.transaction::<_, anyhow::Error, _>(|conn| {
31543154
self.storage
31553155
.set_call(

0 commit comments

Comments
 (0)