Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/graphman/src/commands/deployment/pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn load_active_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<ActiveDeployment, PauseDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -76,7 +79,7 @@ pub async fn pause_active_deployment(
notification_sender: Arc<NotificationSender>,
active_deployment: ActiveDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get().await?;
let primary_conn = primary_pool.get_permitted().await?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.pause_subgraph(&active_deployment.site).await?;
Expand Down
15 changes: 12 additions & 3 deletions core/graphman/src/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ impl Deployment {
&self,
primary_pool: ConnectionPool,
) -> Result<Option<NodeId>, GraphmanError> {
let primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let node = catalog_conn
.assigned_node(&self.site)
Expand Down Expand Up @@ -58,7 +61,10 @@ pub async fn load_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<Deployment, ReassignDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -87,7 +93,10 @@ pub async fn reassign_deployment(
node: &NodeId,
curr_node: Option<NodeId>,
) -> Result<ReassignResult, ReassignDeploymentError> {
let primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let changes: Vec<AssignmentChange> = match &curr_node {
Some(curr) => {
Expand Down
7 changes: 5 additions & 2 deletions core/graphman/src/commands/deployment/resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn load_paused_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<PausedDeployment, ResumeDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -76,7 +79,7 @@ pub async fn resume_paused_deployment(
notification_sender: Arc<NotificationSender>,
paused_deployment: PausedDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get().await?;
let primary_conn = primary_pool.get_permitted().await?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn
Expand Down
7 changes: 5 additions & 2 deletions core/graphman/src/commands/deployment/unassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn load_assigned_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<AssignedDeployment, UnassignDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -73,7 +76,7 @@ pub async fn unassign_deployment(
notification_sender: Arc<NotificationSender>,
deployment: AssignedDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get().await?;
let primary_conn = primary_pool.get_permitted().await?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.unassign_subgraph(&deployment.site).await?;
Expand Down
8 changes: 4 additions & 4 deletions graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
// Back off exponentially whenever we encounter a connection error or a stream with bad data
let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45));

// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

try_stream! {
// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

loop {
let endpoint = client.firehose_endpoint().await?;
let logger = logger.new(o!("deployment" => deployment.clone(), "provider" => endpoint.provider.to_string()));
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures03::future::BoxFuture;
use graph_derive::CheapClone;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use slog::{error, Logger};
use slog::Logger;
use std::{
any::Any,
collections::{HashMap, HashSet},
Expand Down
8 changes: 4 additions & 4 deletions graph/src/blockchain/substreams_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ fn stream_blocks<C: Blockchain, F: BlockStreamMapper<C>>(
// Back off exponentially whenever we encounter a connection error or a stream with bad data
let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45));

// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

let mut log_data = SubstreamsLogData::new();

try_stream! {
// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

if !modules.modules.iter().any(|m| module_name.eq(&m.name)) {
Err(BlockStreamError::Fatal(format!(
"module `{}` not found",
Expand Down
1 change: 0 additions & 1 deletion graph/src/components/network_provider/provider_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::Duration;

use derivative::Derivative;
use itertools::Itertools;
use slog::error;
use slog::info;
use slog::warn;
use slog::Logger;
Expand Down
32 changes: 30 additions & 2 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Shard {
fn validate(&mut self, name: &str) -> Result<()> {
ShardName::new(name.to_string()).map_err(|e| anyhow!(e))?;

self.connection = shellexpand::env(&self.connection)?.into_owned();
self.expand_connection()?;

if matches!(self.pool_size, PoolSize::None) {
return Err(anyhow!("missing pool size definition for shard `{}`", name));
Expand Down Expand Up @@ -301,6 +301,25 @@ impl Shard {
replicas,
})
}

fn expand_connection(&mut self) -> Result<()> {
let mut url = Url::parse(shellexpand::env(&self.connection)?.as_ref())?;
// Put the PGAPPNAME into the URL since tokio-postgres ignores this
// environment variable
if let Some(app_name) = std::env::var("PGAPPNAME").ok() {
let query = match url.query() {
Some(query) => {
format!("{query}&application_name={app_name}")
}
None => {
format!("application_name={app_name}")
}
};
url.set_query(Some(&query));
}
self.connection = url.to_string();
Ok(())
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -1944,6 +1963,10 @@ mod tests {
let query = NodeId::new("query_node_1").unwrap();
let other = NodeId::new("other_node_1").unwrap();

let appname = std::env::var("PGAPPNAME").ok();
unsafe {
std::env::set_var("PGAPPNAME", "config-test");
}
let shard = {
let mut shard = toml::from_str::<Shard>(
r#"
Expand All @@ -1961,10 +1984,15 @@ fdw_pool_size = [
shard.validate("index_node_1").unwrap();
shard
};
if let Some(appname) = appname {
unsafe {
std::env::set_var("PGAPPNAME", appname);
}
}

assert_eq!(
shard.connection,
"postgresql://postgres:postgres@postgres/graph"
"postgresql://postgres:postgres@postgres/graph?application_name=config-test"
);

assert_eq!(shard.pool_size.size_for(&index, "ashard").unwrap(), 20);
Expand Down
6 changes: 3 additions & 3 deletions node/src/manager/commands/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn unassign(
) -> Result<(), Error> {
let locator = search.locate_unique(&primary).await?;

let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = catalog::Connection::new(pconn);

let site = conn
Expand All @@ -38,7 +38,7 @@ pub async fn reassign(
let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?;
let locator = search.locate_unique(&primary).await?;

let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = catalog::Connection::new(pconn);

let site = conn
Expand Down Expand Up @@ -81,7 +81,7 @@ pub async fn pause_or_resume(
locator: &DeploymentLocator,
should_pause: bool,
) -> Result<(), Error> {
let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = catalog::Connection::new(pconn);

let site = conn
Expand Down
5 changes: 3 additions & 2 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ pub async fn info(

pub async fn remove(primary: ConnectionPool, store: BlockStore, name: String) -> Result<(), Error> {
let sites = {
let mut conn =
graph_store_postgres::command_support::catalog::Connection::new(primary.get().await?);
let mut conn = graph_store_postgres::command_support::catalog::Connection::new(
primary.get_permitted().await?,
);
conn.find_sites_for_network(&name).await?
};

Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub async fn run(
if !start_block && (block_hash.is_none() || block_number.is_none()) {
bail!("--block-hash and --block-number must be specified when --start-block is not set");
}
let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = store_catalog::Connection::new(pconn);

let subgraph_store = store.subgraph_store();
Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn site_and_conn(
let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap();
let locator = search.locate_unique(primary_pool).await?;

let pconn = primary_pool.get().await?;
let pconn = primary_pool.get_permitted().await?;
let mut conn = store_catalog::Connection::new(pconn);

let site = conn
Expand Down
6 changes: 5 additions & 1 deletion server/graphman/src/resolvers/deployment_mutation/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use crate::resolvers::context::GraphmanContext;
use graphman::GraphmanError;

pub async fn run(ctx: &GraphmanContext, name: &String) -> Result<()> {
let primary_pool = ctx.primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_pool = ctx
.primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_pool);

let name = match SubgraphName::new(name) {
Expand Down
6 changes: 5 additions & 1 deletion server/graphman/src/resolvers/deployment_mutation/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use crate::resolvers::context::GraphmanContext;
use graphman::GraphmanError;

pub async fn run(ctx: &GraphmanContext, name: &String) -> Result<()> {
let primary_pool = ctx.primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_pool = ctx
.primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_pool);

let name = match SubgraphName::new(name) {
Expand Down
8 changes: 4 additions & 4 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub mod primary {
}

pub(super) async fn drop_chain(pool: &ConnectionPool, name: &str) -> Result<(), StoreError> {
let mut conn = pool.get().await?;
let mut conn = pool.get_permitted().await?;

delete(chains::table.filter(chains::name.eq(name)))
.execute(&mut conn)
Expand Down Expand Up @@ -427,7 +427,7 @@ impl BlockStore {
let cached = match self.chain_head_cache.get(shard.as_str()) {
Some(cached) => cached,
None => {
let mut conn = match pool.get().await {
let mut conn = match pool.get_permitted().await {
Ok(conn) => conn,
Err(StoreError::DatabaseUnavailable) => continue,
Err(e) => return Err(e),
Expand Down Expand Up @@ -568,7 +568,7 @@ impl BlockStore {
use crate::primary::db_version as dbv;

let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap();
let mut conn = primary_pool.get().await?;
let mut conn = primary_pool.get_permitted().await?;
let version: i64 = dbv::table
.select(dbv::version)
.get_result(&mut conn)
Expand Down Expand Up @@ -667,7 +667,7 @@ impl ChainIdStore for BlockStore {

// Update the master copy in the primary
let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap();
let mut conn = primary_pool.get().await?;
let mut conn = primary_pool.get_permitted().await?;

diesel::update(c::table.filter(c::name.eq(chain_name.as_str())))
.set((
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/chain_head_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl ChainHeadUpdateSender {
"head_block_number": number
});

let mut conn = self.pool.get().await?;
let mut conn = self.pool.get_permitted().await?;
self.sender
.notify(
&mut conn,
Expand Down
Loading