Skip to content

Commit 696285b

Browse files
authored
chore(coprocessor): Improve usability of DatabaseURL (#1082)
* chore(coprocessor): implement a DatabaseURL wrapper * chore(coprocessor): use DatabaseURL in sns-worker * chore(coprocessor): use DatabaseURL in zkproof-worker * chore(coprocessor): use DatabaseURL in txn-sender * chore(coprocessor): append default application-name to the db_url * chore(coprocessor): use DatabaseURL across all services * chore(coprocessor): add mask_database_url unit test * chore(coprocessor): append always the default app_name
1 parent 3d2d071 commit 696285b

File tree

33 files changed

+272
-109
lines changed

33 files changed

+272
-109
lines changed

coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ use std::time::Duration;
55
use serde::{de::DeserializeOwned, Serialize};
66
use tfhe::{named::Named, prelude::ParameterSetConformant, Unversionize, Versionize};
77

8+
use sqlx::postgres::PgConnectOptions;
9+
use std::fmt;
10+
use std::str::FromStr;
11+
812
use crate::types::FhevmError;
913

1014
pub const SAFE_SER_DESER_LIMIT: u64 = 1024 * 1024 * 16;
@@ -110,3 +114,125 @@ impl Default for HeartBeat {
110114
Self::new()
111115
}
112116
}
117+
/// Simple wrapper around Database URL string to provide
118+
/// url constraints and masking functionality.
119+
#[derive(Clone)]
120+
pub struct DatabaseURL(String);
121+
122+
impl From<&str> for DatabaseURL {
123+
fn from(s: &str) -> Self {
124+
let url = s.to_owned();
125+
let app_name = Self::default_app_name();
126+
Self::new_with_app_name(&url, &app_name)
127+
}
128+
}
129+
impl From<String> for DatabaseURL {
130+
fn from(s: String) -> Self {
131+
let url = s.to_owned();
132+
let app_name = Self::default_app_name();
133+
Self::new_with_app_name(&url, &app_name)
134+
}
135+
}
136+
137+
impl Default for DatabaseURL {
138+
fn default() -> Self {
139+
let url = std::env::var("DATABASE_URL")
140+
.unwrap_or("postgres://postgres:postgres@localhost:5432/coprocessor".to_owned());
141+
142+
let app_name = Self::default_app_name();
143+
Self::new_with_app_name(&url, &app_name)
144+
}
145+
}
146+
147+
impl DatabaseURL {
148+
/// Create a new DatabaseURL, appending application_name if not present
149+
/// If the base URL already contains an application_name, it will be preserved.
150+
///
151+
/// application_name is useful for identifying the source of DB conns
152+
pub fn new_with_app_name(base: &str, app_name: &str) -> Self {
153+
let app_name = app_name.trim();
154+
if app_name.is_empty() {
155+
return Self(base.to_owned());
156+
}
157+
158+
// Append application_name if not present
159+
let mut url = base.to_owned();
160+
if !url.contains("application_name=") {
161+
if url.contains('?') {
162+
url.push_str(&format!("&application_name={}", app_name));
163+
} else {
164+
url.push_str(&format!("?application_name={}", app_name));
165+
}
166+
}
167+
let url: Self = Self(url);
168+
let _ = url.parse().expect("DatabaseURL should be valid");
169+
url
170+
}
171+
172+
/// Get default app name from the executable name
173+
fn default_app_name() -> String {
174+
std::env::args()
175+
.next()
176+
.and_then(|path| {
177+
std::path::Path::new(&path)
178+
.file_name()
179+
.map(|s| s.to_string_lossy().into_owned())
180+
})
181+
.unwrap_or_default()
182+
}
183+
184+
pub fn as_str(&self) -> &str {
185+
self.0.as_str()
186+
}
187+
188+
pub fn into_inner(self) -> String {
189+
self.0
190+
}
191+
192+
fn mask_password(options: &PgConnectOptions) -> String {
193+
let new_url = format!(
194+
"postgres://{}:{}@{}:{}/{}?application_name={}",
195+
options.get_username(),
196+
"*****",
197+
options.get_host(),
198+
options.get_port(),
199+
options.get_database().unwrap_or_default(),
200+
options.get_application_name().unwrap_or_default()
201+
);
202+
new_url
203+
}
204+
205+
pub fn parse(&self) -> Result<PgConnectOptions, sqlx::Error> {
206+
PgConnectOptions::from_str(self.as_str())
207+
}
208+
}
209+
210+
impl fmt::Display for DatabaseURL {
211+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212+
match PgConnectOptions::from_str(self.as_str()) {
213+
Ok(options) => {
214+
write!(f, "{:?}", Self::mask_password(&options))
215+
}
216+
Err(_) => write!(f, "Invalid DatabaseURL"),
217+
}
218+
}
219+
}
220+
221+
impl fmt::Debug for DatabaseURL {
222+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223+
match PgConnectOptions::from_str(self.as_str()) {
224+
Ok(options) => {
225+
write!(f, "{:?}", options.password("*****"))
226+
}
227+
Err(_) => write!(f, "Invalid DatabaseURL"),
228+
}
229+
}
230+
}
231+
impl FromStr for DatabaseURL {
232+
type Err = sqlx::Error;
233+
234+
fn from_str(s: &str) -> Result<Self, Self::Err> {
235+
let _ = PgConnectOptions::from_str(s)?;
236+
Ok(Self(s.to_owned()))
237+
}
238+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use fhevm_engine_common::utils::DatabaseURL;
2+
3+
#[tokio::test]
4+
async fn mask_database_url() {
5+
let db_url: DatabaseURL = "postgres://postgres:mypassword@localhost:5432/coprocessor".into();
6+
7+
let debug_fmt = format!("{:?}", db_url);
8+
assert!(!debug_fmt.contains("mypassword"));
9+
10+
let display_fmt = format!("{}", db_url);
11+
assert!(!display_fmt.contains("mypassword"));
12+
println!("DatabaseURL: {}", db_url);
13+
14+
let db_url: DatabaseURL = DatabaseURL::new_with_app_name(
15+
"postgres://user:secret@dbhost:5432/mydb?sslmode=require",
16+
"tfhe-worker",
17+
);
18+
19+
assert_eq!(
20+
db_url.as_str(),
21+
"postgres://user:secret@dbhost:5432/mydb?sslmode=require&application_name=tfhe-worker"
22+
);
23+
24+
let db_url: DatabaseURL =
25+
DatabaseURL::new_with_app_name("postgres://user:secret@dbhost:5432/mydb", "tfhe-worker");
26+
27+
assert_eq!(
28+
db_url.as_str(),
29+
"postgres://user:secret@dbhost:5432/mydb?application_name=tfhe-worker"
30+
);
31+
32+
println!("DatabaseURL: {}", db_url);
33+
34+
let db_url: DatabaseURL =
35+
DatabaseURL::new_with_app_name("postgres://user:secret@dbhost:5432/mydb", " ");
36+
37+
assert_eq!(db_url.as_str(), "postgres://user:secret@dbhost:5432/mydb");
38+
}

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use alloy::providers::{ProviderBuilder, WsConnect};
44
use alloy::{primitives::Address, transports::http::reqwest::Url};
55
use clap::Parser;
6-
use fhevm_engine_common::{metrics_server, telemetry};
6+
use fhevm_engine_common::{metrics_server, telemetry, utils::DatabaseURL};
77
use gw_listener::aws_s3::AwsS3Client;
88
use gw_listener::chain_id_from_env;
99
use gw_listener::gw_listener::GatewayListener;
@@ -18,7 +18,7 @@ use tracing::{error, info, Level};
1818
#[command(version, about, long_about = None)]
1919
struct Conf {
2020
#[arg(long)]
21-
database_url: Option<String>,
21+
database_url: Option<DatabaseURL>,
2222

2323
#[arg(long, default_value_t = 16)]
2424
database_pool_size: u32,
@@ -105,18 +105,14 @@ async fn main() -> anyhow::Result<()> {
105105
.with_max_level(conf.log_level)
106106
.init();
107107

108-
info!(conf = ?conf, "Starting gw_listener");
109-
110108
if !conf.service_name.is_empty() {
111109
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
112110
error!(error = %err, "Failed to setup OTLP");
113111
}
114112
}
115113

116-
let database_url = conf
117-
.database_url
118-
.clone()
119-
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));
114+
info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries,
115+
retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway");
120116

121117
let provider = loop {
122118
match ProviderBuilder::new()
@@ -152,7 +148,7 @@ async fn main() -> anyhow::Result<()> {
152148
};
153149
let config = ConfigSettings {
154150
host_chain_id,
155-
database_url,
151+
database_url: conf.database_url.clone().unwrap_or_default(),
156152
database_pool_size: conf.database_pool_size,
157153
verify_proof_req_db_channel: conf.verify_proof_req_database_channel,
158154
gw_url: conf.gw_url,

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
8181
);
8282
let db_pool = PgPoolOptions::new()
8383
.max_connections(self.conf.database_pool_size)
84-
.connect(&self.conf.database_url)
84+
.connect(self.conf.database_url.as_str())
8585
.await?;
8686

8787
let input_verification_handle = {
@@ -549,7 +549,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
549549
// Check database connection
550550
let db_pool_result = PgPoolOptions::new()
551551
.max_connections(self.conf.database_pool_size)
552-
.connect(&self.conf.database_url)
552+
.connect(self.conf.database_url.as_str())
553553
.await;
554554

555555
match db_pool_result {

coprocessor/fhevm-engine/gw-listener/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use alloy::primitives::Uint;
22
use alloy::transports::http::reqwest::Url;
3+
use fhevm_engine_common::utils::DatabaseURL;
34
use std::time::Duration;
45

56
use tracing::error;
@@ -36,7 +37,7 @@ impl TryFrom<u8> for KeyType {
3637
#[derive(Clone, Debug)]
3738
pub struct ConfigSettings {
3839
pub host_chain_id: ChainId,
39-
pub database_url: String,
40+
pub database_url: DatabaseURL,
4041
pub database_pool_size: u32,
4142
pub verify_proof_req_db_channel: String,
4243

@@ -67,8 +68,7 @@ impl Default for ConfigSettings {
6768
fn default() -> Self {
6869
Self {
6970
host_chain_id: chain_id_from_env().unwrap_or(12345),
70-
database_url: std::env::var("DATABASE_URL")
71-
.unwrap_or("postgres://postgres:postgres@localhost/coprocessor".to_owned()),
71+
database_url: DatabaseURL::default(),
7272
database_pool_size: 16,
7373
verify_proof_req_db_channel: "event_zkpok_new_work".to_owned(),
7474
gw_url: "ws://127.0.0.1:8546".try_into().expect("Invalid URL"),

coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ impl TestEnvironment {
132132
.await
133133
.expect("valid db instance");
134134
eprintln!("New test database on {}", instance.db_url());
135-
conf.database_url = instance.db_url().to_owned();
135+
conf.database_url = instance.db_url.clone();
136136
_test_instance = Some(instance);
137137
};
138138
conf.error_sleep_initial_secs = 1;
139139
conf.error_sleep_max_secs = 1;
140140
let db_pool = PgPoolOptions::new()
141141
.max_connections(16)
142142
.acquire_timeout(Duration::from_secs(5))
143-
.connect(&conf.database_url)
143+
.connect(conf.database_url.as_str())
144144
.await?;
145145

146146
// Delete all proofs from the database.

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken;
2424

2525
use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer;
2626
use fhevm_engine_common::types::{BlockchainProvider, Handle};
27-
use fhevm_engine_common::utils::HeartBeat;
27+
use fhevm_engine_common::utils::{DatabaseURL, HeartBeat};
2828

2929
use crate::contracts::{AclContract, TfheContract};
3030
use crate::database::tfhe_event_propagate::{
@@ -58,7 +58,7 @@ pub struct Args {
5858
long,
5959
default_value = "postgresql://postgres:postgres@localhost:5432/coprocessor"
6060
)]
61-
pub database_url: String,
61+
pub database_url: DatabaseURL,
6262

6363
#[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)]
6464
pub start_at_block: Option<i64>,
@@ -970,7 +970,7 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
970970
let mut log_iter = InfiniteLogIter::new(&args);
971971
let chain_id = log_iter.get_chain_id().await?;
972972
info!(chain_id = chain_id, "Chain ID");
973-
if args.database_url.is_empty() {
973+
if args.database_url.as_str().is_empty() {
974974
error!("Database URL is required");
975975
panic!("Database URL is required");
976976
};

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use anyhow::Result;
55
use fhevm_engine_common::telemetry;
66
use fhevm_engine_common::types::AllowEvents;
77
use fhevm_engine_common::types::SupportedFheOperations;
8+
use fhevm_engine_common::utils::DatabaseURL;
89
use fhevm_engine_common::utils::{compact_hex, HeartBeat};
910
use sqlx::postgres::PgConnectOptions;
1011
use sqlx::postgres::PgPoolOptions;
@@ -70,7 +71,7 @@ pub fn retry_on_sqlx_error(err: &SqlxError, retry_count: &mut usize) -> bool {
7071

7172
// A pool of connection with some cached information and automatic reconnection
7273
pub struct Database {
73-
url: String,
74+
url: DatabaseURL,
7475
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
7576
pub tenant_id: TenantId,
7677
pub chain_id: ChainId,
@@ -90,7 +91,7 @@ pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;
9091

9192
impl Database {
9293
pub async fn new(
93-
url: &str,
94+
url: &DatabaseURL,
9495
coprocessor_api_key: &CoprocessorApiKey,
9596
bucket_cache_size: u16,
9697
) -> Result<Self> {
@@ -105,7 +106,7 @@ impl Database {
105106
.into(),
106107
));
107108
Ok(Database {
108-
url: url.into(),
109+
url: url.clone(),
109110
tenant_id,
110111
chain_id,
111112
pool: Arc::new(RwLock::new(pool)),
@@ -114,7 +115,7 @@ impl Database {
114115
})
115116
}
116117

117-
async fn new_pool(url: &str) -> PgPool {
118+
async fn new_pool(url: &DatabaseURL) -> PgPool {
118119
let options: PgConnectOptions = url.parse().expect("bad url");
119120
let options = options.options([
120121
("statement_timeout", "10000"), // 5 seconds

coprocessor/fhevm-engine/host-listener/tests/integration_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async fn setup(node_chain_id: Option<u64>) -> Result<Setup, anyhow::Error> {
217217
initial_block_time: 1,
218218
acl_contract_address: acl_contract.address().to_string(),
219219
tfhe_contract_address: tfhe_contract.address().to_string(),
220-
database_url: test_instance.db_url().to_string(),
220+
database_url: test_instance.db_url.clone(),
221221
coprocessor_api_key: Some(coprocessor_api_key),
222222
start_at_block: None,
223223
end_at_block: None,

coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@ fn handle_sigint(token: CancellationToken) {
1616
fn construct_config() -> Config {
1717
let args: utils::daemon_cli::Args = utils::daemon_cli::parse_args();
1818

19-
let db_url = args
20-
.database_url
21-
.clone()
22-
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));
19+
let db_url = args.database_url.clone().unwrap_or_default();
2320

2421
Config {
2522
tenant_api_key: args.tenant_api_key,

0 commit comments

Comments
 (0)