diff --git a/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs b/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs index 7228723b1..c30946b11 100644 --- a/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs +++ b/coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs @@ -5,6 +5,10 @@ use std::time::Duration; use serde::{de::DeserializeOwned, Serialize}; use tfhe::{named::Named, prelude::ParameterSetConformant, Unversionize, Versionize}; +use sqlx::postgres::PgConnectOptions; +use std::fmt; +use std::str::FromStr; + use crate::types::FhevmError; pub const SAFE_SER_DESER_LIMIT: u64 = 1024 * 1024 * 16; @@ -110,3 +114,125 @@ impl Default for HeartBeat { Self::new() } } +/// Simple wrapper around Database URL string to provide +/// url constraints and masking functionality. +#[derive(Clone)] +pub struct DatabaseURL(String); + +impl From<&str> for DatabaseURL { + fn from(s: &str) -> Self { + let url = s.to_owned(); + let app_name = Self::default_app_name(); + Self::new_with_app_name(&url, &app_name) + } +} +impl From for DatabaseURL { + fn from(s: String) -> Self { + let url = s.to_owned(); + let app_name = Self::default_app_name(); + Self::new_with_app_name(&url, &app_name) + } +} + +impl Default for DatabaseURL { + fn default() -> Self { + let url = std::env::var("DATABASE_URL") + .unwrap_or("postgres://postgres:postgres@localhost:5432/coprocessor".to_owned()); + + let app_name = Self::default_app_name(); + Self::new_with_app_name(&url, &app_name) + } +} + +impl DatabaseURL { + /// Create a new DatabaseURL, appending application_name if not present + /// If the base URL already contains an application_name, it will be preserved. + /// + /// application_name is useful for identifying the source of DB conns + pub fn new_with_app_name(base: &str, app_name: &str) -> Self { + let app_name = app_name.trim(); + if app_name.is_empty() { + return Self(base.to_owned()); + } + + // Append application_name if not present + let mut url = base.to_owned(); + if !url.contains("application_name=") { + if url.contains('?') { + url.push_str(&format!("&application_name={}", app_name)); + } else { + url.push_str(&format!("?application_name={}", app_name)); + } + } + let url: Self = Self(url); + let _ = url.parse().expect("DatabaseURL should be valid"); + url + } + + /// Get default app name from the executable name + fn default_app_name() -> String { + std::env::args() + .next() + .and_then(|path| { + std::path::Path::new(&path) + .file_name() + .map(|s| s.to_string_lossy().into_owned()) + }) + .unwrap_or_default() + } + + pub fn as_str(&self) -> &str { + self.0.as_str() + } + + pub fn into_inner(self) -> String { + self.0 + } + + fn mask_password(options: &PgConnectOptions) -> String { + let new_url = format!( + "postgres://{}:{}@{}:{}/{}?application_name={}", + options.get_username(), + "*****", + options.get_host(), + options.get_port(), + options.get_database().unwrap_or_default(), + options.get_application_name().unwrap_or_default() + ); + new_url + } + + pub fn parse(&self) -> Result { + PgConnectOptions::from_str(self.as_str()) + } +} + +impl fmt::Display for DatabaseURL { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match PgConnectOptions::from_str(self.as_str()) { + Ok(options) => { + write!(f, "{:?}", Self::mask_password(&options)) + } + Err(_) => write!(f, "Invalid DatabaseURL"), + } + } +} + +impl fmt::Debug for DatabaseURL { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match PgConnectOptions::from_str(self.as_str()) { + Ok(options) => { + write!(f, "{:?}", options.password("*****")) + } + Err(_) => write!(f, "Invalid DatabaseURL"), + } + } +} +impl FromStr for DatabaseURL { + type Err = sqlx::Error; + + fn from_str(s: &str) -> Result { + let _ = PgConnectOptions::from_str(s)?; + Ok(Self(s.to_owned())) + } +} diff --git a/coprocessor/fhevm-engine/fhevm-engine-common/tests/utils.rs b/coprocessor/fhevm-engine/fhevm-engine-common/tests/utils.rs new file mode 100644 index 000000000..7a8bf86d3 --- /dev/null +++ b/coprocessor/fhevm-engine/fhevm-engine-common/tests/utils.rs @@ -0,0 +1,38 @@ +use fhevm_engine_common::utils::DatabaseURL; + +#[tokio::test] +async fn mask_database_url() { + let db_url: DatabaseURL = "postgres://postgres:mypassword@localhost:5432/coprocessor".into(); + + let debug_fmt = format!("{:?}", db_url); + assert!(!debug_fmt.contains("mypassword")); + + let display_fmt = format!("{}", db_url); + assert!(!display_fmt.contains("mypassword")); + println!("DatabaseURL: {}", db_url); + + let db_url: DatabaseURL = DatabaseURL::new_with_app_name( + "postgres://user:secret@dbhost:5432/mydb?sslmode=require", + "tfhe-worker", + ); + + assert_eq!( + db_url.as_str(), + "postgres://user:secret@dbhost:5432/mydb?sslmode=require&application_name=tfhe-worker" + ); + + let db_url: DatabaseURL = + DatabaseURL::new_with_app_name("postgres://user:secret@dbhost:5432/mydb", "tfhe-worker"); + + assert_eq!( + db_url.as_str(), + "postgres://user:secret@dbhost:5432/mydb?application_name=tfhe-worker" + ); + + println!("DatabaseURL: {}", db_url); + + let db_url: DatabaseURL = + DatabaseURL::new_with_app_name("postgres://user:secret@dbhost:5432/mydb", " "); + + assert_eq!(db_url.as_str(), "postgres://user:secret@dbhost:5432/mydb"); +} diff --git a/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs b/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs index 57ddf2e10..3e2983c64 100644 --- a/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs +++ b/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs @@ -3,7 +3,7 @@ use std::time::Duration; use alloy::providers::{ProviderBuilder, WsConnect}; use alloy::{primitives::Address, transports::http::reqwest::Url}; use clap::Parser; -use fhevm_engine_common::{metrics_server, telemetry}; +use fhevm_engine_common::{metrics_server, telemetry, utils::DatabaseURL}; use gw_listener::aws_s3::AwsS3Client; use gw_listener::chain_id_from_env; use gw_listener::gw_listener::GatewayListener; @@ -18,7 +18,7 @@ use tracing::{error, info, Level}; #[command(version, about, long_about = None)] struct Conf { #[arg(long)] - database_url: Option, + database_url: Option, #[arg(long, default_value_t = 16)] database_pool_size: u32, @@ -105,18 +105,14 @@ async fn main() -> anyhow::Result<()> { .with_max_level(conf.log_level) .init(); - info!(conf = ?conf, "Starting gw_listener"); - if !conf.service_name.is_empty() { if let Err(err) = telemetry::setup_otlp(&conf.service_name) { error!(error = %err, "Failed to setup OTLP"); } } - let database_url = conf - .database_url - .clone() - .unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined")); + info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries, + retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway"); let provider = loop { match ProviderBuilder::new() @@ -152,7 +148,7 @@ async fn main() -> anyhow::Result<()> { }; let config = ConfigSettings { host_chain_id, - database_url, + database_url: conf.database_url.clone().unwrap_or_default(), database_pool_size: conf.database_pool_size, verify_proof_req_db_channel: conf.verify_proof_req_database_channel, gw_url: conf.gw_url, diff --git a/coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs b/coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs index 66ac2e3af..52e22df63 100644 --- a/coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs +++ b/coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs @@ -81,7 +81,7 @@ impl + Clone + 'static, A: AwsS3Interface + Clone + 'stati ); let db_pool = PgPoolOptions::new() .max_connections(self.conf.database_pool_size) - .connect(&self.conf.database_url) + .connect(self.conf.database_url.as_str()) .await?; let input_verification_handle = { @@ -549,7 +549,7 @@ impl + Clone + 'static, A: AwsS3Interface + Clone + 'stati // Check database connection let db_pool_result = PgPoolOptions::new() .max_connections(self.conf.database_pool_size) - .connect(&self.conf.database_url) + .connect(self.conf.database_url.as_str()) .await; match db_pool_result { diff --git a/coprocessor/fhevm-engine/gw-listener/src/lib.rs b/coprocessor/fhevm-engine/gw-listener/src/lib.rs index 4ca31ede3..9e1b35c72 100644 --- a/coprocessor/fhevm-engine/gw-listener/src/lib.rs +++ b/coprocessor/fhevm-engine/gw-listener/src/lib.rs @@ -1,5 +1,6 @@ use alloy::primitives::Uint; use alloy::transports::http::reqwest::Url; +use fhevm_engine_common::utils::DatabaseURL; use std::time::Duration; use tracing::error; @@ -36,7 +37,7 @@ impl TryFrom for KeyType { #[derive(Clone, Debug)] pub struct ConfigSettings { pub host_chain_id: ChainId, - pub database_url: String, + pub database_url: DatabaseURL, pub database_pool_size: u32, pub verify_proof_req_db_channel: String, @@ -67,8 +68,7 @@ impl Default for ConfigSettings { fn default() -> Self { Self { host_chain_id: chain_id_from_env().unwrap_or(12345), - database_url: std::env::var("DATABASE_URL") - .unwrap_or("postgres://postgres:postgres@localhost/coprocessor".to_owned()), + database_url: DatabaseURL::default(), database_pool_size: 16, verify_proof_req_db_channel: "event_zkpok_new_work".to_owned(), gw_url: "ws://127.0.0.1:8546".try_into().expect("Invalid URL"), diff --git a/coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs b/coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs index edb9fe9b9..d38ee6745 100644 --- a/coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs +++ b/coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs @@ -132,7 +132,7 @@ impl TestEnvironment { .await .expect("valid db instance"); eprintln!("New test database on {}", instance.db_url()); - conf.database_url = instance.db_url().to_owned(); + conf.database_url = instance.db_url.clone(); _test_instance = Some(instance); }; conf.error_sleep_initial_secs = 1; @@ -140,7 +140,7 @@ impl TestEnvironment { let db_pool = PgPoolOptions::new() .max_connections(16) .acquire_timeout(Duration::from_secs(5)) - .connect(&conf.database_url) + .connect(conf.database_url.as_str()) .await?; // Delete all proofs from the database. diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index 962b65a92..5c5c5ff19 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken; use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer; use fhevm_engine_common::types::{BlockchainProvider, Handle}; -use fhevm_engine_common::utils::HeartBeat; +use fhevm_engine_common::utils::{DatabaseURL, HeartBeat}; use crate::contracts::{AclContract, TfheContract}; use crate::database::tfhe_event_propagate::{ @@ -58,7 +58,7 @@ pub struct Args { long, default_value = "postgresql://postgres:postgres@localhost:5432/coprocessor" )] - pub database_url: String, + pub database_url: DatabaseURL, #[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)] pub start_at_block: Option, @@ -970,7 +970,7 @@ pub async fn main(args: Args) -> anyhow::Result<()> { let mut log_iter = InfiniteLogIter::new(&args); let chain_id = log_iter.get_chain_id().await?; info!(chain_id = chain_id, "Chain ID"); - if args.database_url.is_empty() { + if args.database_url.as_str().is_empty() { error!("Database URL is required"); panic!("Database URL is required"); }; diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index 29de82558..3f12709c8 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -5,6 +5,7 @@ use anyhow::Result; use fhevm_engine_common::telemetry; use fhevm_engine_common::types::AllowEvents; use fhevm_engine_common::types::SupportedFheOperations; +use fhevm_engine_common::utils::DatabaseURL; use fhevm_engine_common::utils::{compact_hex, HeartBeat}; use sqlx::postgres::PgConnectOptions; use sqlx::postgres::PgPoolOptions; @@ -70,7 +71,7 @@ pub fn retry_on_sqlx_error(err: &SqlxError, retry_count: &mut usize) -> bool { // A pool of connection with some cached information and automatic reconnection pub struct Database { - url: String, + url: DatabaseURL, pub pool: Arc>>, pub tenant_id: TenantId, pub chain_id: ChainId, @@ -90,7 +91,7 @@ pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>; impl Database { pub async fn new( - url: &str, + url: &DatabaseURL, coprocessor_api_key: &CoprocessorApiKey, bucket_cache_size: u16, ) -> Result { @@ -105,7 +106,7 @@ impl Database { .into(), )); Ok(Database { - url: url.into(), + url: url.clone(), tenant_id, chain_id, pool: Arc::new(RwLock::new(pool)), @@ -114,7 +115,7 @@ impl Database { }) } - async fn new_pool(url: &str) -> PgPool { + async fn new_pool(url: &DatabaseURL) -> PgPool { let options: PgConnectOptions = url.parse().expect("bad url"); let options = options.options([ ("statement_timeout", "10000"), // 5 seconds diff --git a/coprocessor/fhevm-engine/host-listener/tests/integration_test.rs b/coprocessor/fhevm-engine/host-listener/tests/integration_test.rs index 1d541ae2d..d8eed7698 100644 --- a/coprocessor/fhevm-engine/host-listener/tests/integration_test.rs +++ b/coprocessor/fhevm-engine/host-listener/tests/integration_test.rs @@ -217,7 +217,7 @@ async fn setup(node_chain_id: Option) -> Result { initial_block_time: 1, acl_contract_address: acl_contract.address().to_string(), tfhe_contract_address: tfhe_contract.address().to_string(), - database_url: test_instance.db_url().to_string(), + database_url: test_instance.db_url.clone(), coprocessor_api_key: Some(coprocessor_api_key), start_at_block: None, end_at_block: None, diff --git a/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs b/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs index af72d0a26..62df5bd1a 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs @@ -16,10 +16,7 @@ fn handle_sigint(token: CancellationToken) { fn construct_config() -> Config { let args: utils::daemon_cli::Args = utils::daemon_cli::parse_args(); - let db_url = args - .database_url - .clone() - .unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined")); + let db_url = args.database_url.clone().unwrap_or_default(); Config { tenant_api_key: args.tenant_api_key, diff --git a/coprocessor/fhevm-engine/sns-worker/src/bin/utils/daemon_cli.rs b/coprocessor/fhevm-engine/sns-worker/src/bin/utils/daemon_cli.rs index e517a8922..396dbe2a3 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/bin/utils/daemon_cli.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/bin/utils/daemon_cli.rs @@ -2,6 +2,7 @@ use std::time::Duration; use clap::{command, Parser}; use fhevm_engine_common::telemetry::MetricsConfig; +use fhevm_engine_common::utils::DatabaseURL; use humantime::parse_duration; use sns_worker::{SchedulePolicy, SNS_LATENCY_OP_HISTOGRAM_CONF}; use tracing::Level; @@ -48,7 +49,7 @@ pub struct Args { /// Postgres database url. If unspecified DATABASE_URL environment variable /// is used #[arg(long)] - pub database_url: Option, + pub database_url: Option, /// KeySet file. If unspecified the the keys are read from the database #[arg(long)] diff --git a/coprocessor/fhevm-engine/sns-worker/src/lib.rs b/coprocessor/fhevm-engine/sns-worker/src/lib.rs index c8bb63912..1c11007c8 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/lib.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/lib.rs @@ -23,7 +23,7 @@ use fhevm_engine_common::{ telemetry::{self, OtelTracer}, telemetry::{register_histogram, MetricsConfig}, types::FhevmError, - utils::compact_hex, + utils::{compact_hex, DatabaseURL}, }; use futures::join; use serde::{Deserialize, Serialize}; @@ -60,7 +60,7 @@ pub struct KeySet { #[derive(Clone)] pub struct DBConfig { - pub url: String, + pub url: DatabaseURL, pub listen_channels: Vec, pub notify_channel: String, pub batch_limit: u32, @@ -75,17 +75,6 @@ pub struct DBConfig { pub lifo: bool, } -impl std::fmt::Debug for DBConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // Custom debug impl to avoid printing sensitive information - write!( - f, - "db_listen_channel: {:?}, db_notify_channel: {}, db_batch_limit: {}, db_gc_batch_limit: {}, db_polling_interval: {}, db_cleanup_interval: {:?}, db_max_connections: {}, db_timeout: {:?}, lifo: {}", - self.listen_channels, self.notify_channel, self.batch_limit, self.gc_batch_limit, self.polling_interval, self.cleanup_interval, self.max_connections, self.timeout, self.lifo - ) - } -} - #[derive(Clone, Default, Debug)] pub struct S3Config { pub bucket_ct128: String, @@ -109,7 +98,7 @@ pub struct HealthCheckConfig { pub port: u16, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Config { pub tenant_api_key: String, pub service_name: String, @@ -510,7 +499,7 @@ pub async fn run_all( mpsc::channel::(10 * config.s3.max_concurrent_uploads as usize); let rayon_threads = rayon::current_num_threads(); - info!(config = ?config, rayon_threads, "Starting SNS worker"); + info!(config = %config, rayon_threads, "Starting SNS worker"); if !config.service_name.is_empty() { if let Err(err) = telemetry::setup_otlp(&config.service_name) { diff --git a/coprocessor/fhevm-engine/sns-worker/src/tests/mod.rs b/coprocessor/fhevm-engine/sns-worker/src/tests/mod.rs index 1a92e3e8a..46363aa15 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/tests/mod.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/tests/mod.rs @@ -6,7 +6,7 @@ use crate::{ }; use anyhow::{anyhow, Ok}; use aws_config::BehaviorVersion; -use fhevm_engine_common::utils::compact_hex; +use fhevm_engine_common::utils::{compact_hex, DatabaseURL}; use serde::{Deserialize, Serialize}; use serial_test::serial; use std::{ @@ -422,13 +422,13 @@ async fn setup(enable_compression: bool) -> anyhow::Result { .await .expect("valid db instance"); - let conf = build_test_config(db_instance.db_url().to_owned(), enable_compression); + let conf = build_test_config(db_instance.db_url.clone(), enable_compression); // Set up the database connection pool let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(conf.db.max_connections) .acquire_timeout(conf.db.timeout) - .connect(&conf.db.url) + .connect(conf.db.url.as_str()) .await?; // Set up S3 storage @@ -709,7 +709,7 @@ async fn assert_ciphertext_s3_object_count( .await; } -fn build_test_config(db_url: String, enable_compression: bool) -> Config { +fn build_test_config(url: DatabaseURL, enable_compression: bool) -> Config { let batch_limit = std::env::var("BATCH_LIMIT") .ok() .and_then(|v| v.parse::().ok()) @@ -723,7 +723,7 @@ fn build_test_config(db_url: String, enable_compression: bool) -> Config { Config { tenant_api_key: TENANT_API_KEY.to_string(), db: DBConfig { - url: db_url, + url, listen_channels: vec![LISTEN_CHANNEL.to_string()], notify_channel: "fhevm".to_string(), batch_limit, diff --git a/coprocessor/fhevm-engine/stress-test-generator/data/json/minitest_002_erc20.json b/coprocessor/fhevm-engine/stress-test-generator/data/json/minitest_002_erc20.json index 15c7803ad..65dae4f24 100644 --- a/coprocessor/fhevm-engine/stress-test-generator/data/json/minitest_002_erc20.json +++ b/coprocessor/fhevm-engine/stress-test-generator/data/json/minitest_002_erc20.json @@ -14,4 +14,4 @@ ] ] } -] +] \ No newline at end of file diff --git a/coprocessor/fhevm-engine/stress-test-generator/src/bin/stress_generator.rs b/coprocessor/fhevm-engine/stress-test-generator/src/bin/stress_generator.rs index 6786a8db7..e94286e3b 100644 --- a/coprocessor/fhevm-engine/stress-test-generator/src/bin/stress_generator.rs +++ b/coprocessor/fhevm-engine/stress-test-generator/src/bin/stress_generator.rs @@ -5,6 +5,7 @@ use axum::{ Json, Router, }; use chrono::{DateTime, Utc}; +use fhevm_engine_common::utils::DatabaseURL; use host_listener::database::tfhe_event_propagate::{Database as ListenerDatabase, Handle}; use sqlx::Postgres; @@ -416,16 +417,17 @@ async fn generate_transactions_at_rate( scenario: &Scenario, ) -> Result<(), Box> { let ecfg = EnvConfig::new(); + let database_url: DatabaseURL = ecfg.evgen_db_url.into(); let coprocessor_api_key = sqlx::types::Uuid::parse_str(&ecfg.api_key).unwrap(); let mut listener_event_to_db = ListenerDatabase::new( - &ecfg.evgen_db_url, + &database_url, &coprocessor_api_key, default_dependence_cache_size(), ) .await?; let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(2) - .connect(&ecfg.evgen_db_url) + .connect(database_url.as_str()) .await .unwrap(); let mut dependence_handle1: Option = None; @@ -497,16 +499,17 @@ async fn generate_transactions_count( scenario: &Scenario, ) -> Result<(), Box> { let ecfg = ctx.ecfg.clone(); + let database_url: DatabaseURL = ecfg.evgen_db_url.into(); let coprocessor_api_key = sqlx::types::Uuid::parse_str(&ecfg.api_key).unwrap(); let mut listener_event_to_db = ListenerDatabase::new( - &ecfg.evgen_db_url, + &database_url, &coprocessor_api_key, default_dependence_cache_size(), ) .await?; let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(2) - .connect(&ecfg.evgen_db_url) + .connect(database_url.as_str()) .await .unwrap(); diff --git a/coprocessor/fhevm-engine/test-harness/src/instance.rs b/coprocessor/fhevm-engine/test-harness/src/instance.rs index 2b794f144..5c206fdbb 100644 --- a/coprocessor/fhevm-engine/test-harness/src/instance.rs +++ b/coprocessor/fhevm-engine/test-harness/src/instance.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use crate::db_utils::setup_test_user; +use fhevm_engine_common::utils::DatabaseURL; use sqlx::postgres::types::Oid; use sqlx::Row; use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage, ImageExt}; @@ -10,7 +11,7 @@ use tracing::info; #[derive(Clone)] pub struct DBInstance { _container: Option>>, - db_url: String, + pub db_url: DatabaseURL, pub parent_token: CancellationToken, } @@ -58,22 +59,21 @@ async fn setup_test_app_existing_localhost( with_reset: bool, mode: ImportMode, ) -> Result> { - let db_url = "postgresql://postgres:postgres@127.0.0.1:5432/coprocessor"; - let db_url = std::env::var("DATABASE_URL").unwrap_or(db_url.to_string()); + let db_url = DatabaseURL::default(); if with_reset { info!("Resetting local database at {db_url}"); - let admin_db_url = db_url.replace("coprocessor", "postgres"); - create_database(&admin_db_url, &db_url, mode).await?; + let admin_db_url = db_url.to_string().replace("coprocessor", "postgres"); + create_database(&admin_db_url, db_url.as_str(), mode).await?; } info!("Using existing local database at {db_url}"); - let _ = get_sns_pk_size(&sqlx::PgPool::connect(&db_url).await?, 12345).await; + let _ = get_sns_pk_size(&sqlx::PgPool::connect(db_url.as_str()).await?, 12345).await; Ok(DBInstance { _container: None, - db_url: db_url.to_string(), + db_url, parent_token: CancellationToken::new(), }) } @@ -102,7 +102,7 @@ async fn setup_test_app_custom_docker( Ok(DBInstance { _container: Some(Arc::new(container)), - db_url, + db_url: db_url.into(), parent_token: CancellationToken::new(), }) } diff --git a/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs index dc191d224..191ce2077 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs @@ -100,7 +100,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { pg_pool_max_connections: 2, server_addr: format!("127.0.0.1:{app_port}"), metrics_addr: None, - database_url: Some(db_url.to_string()), + database_url: Some(db_url.into()), maximum_compact_inputs_upload: 10, coprocessor_private_key: "./coprocessor.key".to_string(), service_name: "coprocessor".to_string(), diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs b/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs index 2bf74a822..430d5b5e5 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs @@ -1,5 +1,6 @@ use clap::Parser; use fhevm_engine_common::telemetry::MetricsConfig; +use fhevm_engine_common::utils::DatabaseURL; use tracing::Level; #[derive(Parser, Debug, Clone)] @@ -71,7 +72,7 @@ pub struct Args { /// Postgres database url. If unspecified DATABASE_URL environment variable is used #[arg(long)] - pub database_url: Option, + pub database_url: Option, /// Coprocessor private key file path. /// Private key is in plain text 0x1234.. format. diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/health_check.rs b/coprocessor/fhevm-engine/tfhe-worker/src/health_check.rs index 6e54ba447..0bbc09cad 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/health_check.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/health_check.rs @@ -3,7 +3,7 @@ use std::time::Duration; use fhevm_engine_common::healthz_server::{ default_get_version, HealthCheckService, HealthStatus, Version, }; -use fhevm_engine_common::utils::HeartBeat; +use fhevm_engine_common::utils::{DatabaseURL, HeartBeat}; const ACTIVITY_FRESHNESS: Duration = Duration::from_secs(10); // Not alive if tick is older const CONNECTED_TICK_FRESHNESS: Duration = Duration::from_secs(5); // Need to check connection if tick is older @@ -11,13 +11,13 @@ const CONNECTED_TICK_FRESHNESS: Duration = Duration::from_secs(5); // Need to ch /// Represents the health status of the transaction sender service #[derive(Clone, Debug)] pub struct HealthCheck { - pub database_url: String, + pub database_url: DatabaseURL, pub database_heartbeat: HeartBeat, pub activity_heartbeat: HeartBeat, } impl HealthCheck { - pub fn new(database_url: String) -> Self { + pub fn new(database_url: DatabaseURL) -> Self { // A lazy pool is used to avoid blocking the main thread during initialization or bad database URL Self { database_url, @@ -47,7 +47,7 @@ impl HealthCheckService for HealthCheck { let pool = sqlx::postgres::PgPoolOptions::new() .acquire_timeout(Duration::from_secs(5)) .max_connections(1) - .connect(&self.database_url); + .connect(self.database_url.as_str()); if let Ok(pool) = pool.await { status.set_db_connected(&pool).await; } else { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs b/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs index 2b1149618..398403bb5 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs @@ -70,11 +70,8 @@ pub async fn async_main( } } - let health_check = health_check::HealthCheck::new( - args.database_url - .clone() - .unwrap_or("no_database_url".to_string()), - ); + let database_url = args.database_url.clone().unwrap_or_default(); + let health_check = health_check::HealthCheck::new(database_url); let mut set = JoinSet::new(); if args.run_server { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/server.rs b/coprocessor/fhevm-engine/tfhe-worker/src/server.rs index cd8dc8f6d..f28afabb2 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/server.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/server.rs @@ -105,17 +105,17 @@ pub async fn run_server_iteration( .server_addr .parse() .expect("Can't parse server address"); - let db_url = crate::utils::db_url(&args); let coprocessor_key_file = tokio::fs::read_to_string(&args.coprocessor_private_key).await?; let signer = PrivateKeySigner::from_str(coprocessor_key_file.trim())?; info!(target: "grpc_server", { address = signer.address().to_string() }, "Coprocessor signer initiated"); + let database_url = args.database_url.clone().unwrap_or_default(); info!("Coprocessor listening on {}", addr); let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(args.pg_pool_max_connections) - .connect(&db_url) + .connect(database_url.as_str()) .await?; let tenant_key_cache: std::sync::Arc>> = diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs index a7a313db8..ac36e7da0 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs @@ -258,10 +258,14 @@ fn next_handle() -> Handle { async fn listener_event_to_db(app: &TestInstance) -> ListenerDatabase { let coprocessor_api_key = sqlx::types::Uuid::parse_str(default_api_key()).unwrap(); - let url = app.db_url().to_string(); - ListenerDatabase::new(&url, &coprocessor_api_key, default_dependence_cache_size()) - .await - .unwrap() + + ListenerDatabase::new( + &app.db_url().into(), + &coprocessor_api_key, + default_dependence_cache_size(), + ) + .await + .unwrap() } #[tokio::test] diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs index 8b0c6e976..b74fae881 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs @@ -109,7 +109,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { pg_pool_max_connections: 2, server_addr: format!("127.0.0.1:{app_port}"), metrics_addr: None, - database_url: Some(db_url.to_string()), + database_url: Some(db_url.into()), maximum_compact_inputs_upload: 10, coprocessor_private_key: "./coprocessor.key".to_string(), service_name: "coprocessor".to_string(), diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index 26a302714..a480ef2a0 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -78,10 +78,11 @@ async fn tfhe_worker_cycle( std::sync::Arc::new(tokio::sync::RwLock::new(lru::LruCache::new( NonZeroUsize::new(args.tenant_key_cache_size as usize).unwrap(), ))); - let db_url = crate::utils::db_url(args); + let db_url = args.database_url.clone().unwrap_or_default(); + let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(args.pg_pool_max_connections) - .connect(&db_url) + .connect(db_url.as_str()) .await?; let mut listener = PgListener::connect_with(&pool).await?; listener.listen("work_available").await?; diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/src/utils.rs index 6c91bdb76..089fa5f71 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/utils.rs @@ -168,13 +168,6 @@ pub fn sort_computations_by_dependencies( Ok((res, handles_to_check_in_db)) } -pub fn db_url(args: &crate::daemon_cli::Args) -> String { - if let Some(db_url) = &args.database_url { - return db_url.clone(); - } - std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined") -} - #[test] fn test_invalid_handle_too_short() { let comp = vec![AsyncComputation { diff --git a/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs b/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs index 68a7172b8..baf04acd9 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs @@ -21,6 +21,7 @@ use transaction_sender::{ use fhevm_engine_common::{ metrics_server, telemetry::{self, MetricsConfig}, + utils::DatabaseURL, }; use humantime::parse_duration; @@ -52,7 +53,7 @@ struct Conf { private_key: Option, #[arg(short, long)] - database_url: Option, + database_url: Option, #[arg(long, default_value = "10")] database_pool_size: u32, @@ -227,10 +228,7 @@ async fn main() -> anyhow::Result<()> { } } let wallet = EthereumWallet::new(abstract_signer.clone()); - let database_url = match conf.database_url.clone() { - Some(url) => url, - None => std::env::var("DATABASE_URL").context("DATABASE_URL is undefined")?, - }; + let database_url = conf.database_url.clone(); let provider = loop { if cancel_token.is_cancelled() { diff --git a/coprocessor/fhevm-engine/transaction-sender/src/config.rs b/coprocessor/fhevm-engine/transaction-sender/src/config.rs index d793a7a73..92118ea73 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/config.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/config.rs @@ -1,8 +1,10 @@ use std::time::Duration; +use fhevm_engine_common::utils::DatabaseURL; + #[derive(Clone, Debug)] pub struct ConfigSettings { - pub database_url: String, + pub database_url: Option, pub database_pool_size: u32, pub verify_proof_resp_db_channel: String, @@ -46,8 +48,7 @@ pub struct ConfigSettings { impl Default for ConfigSettings { fn default() -> Self { Self { - database_url: std::env::var("DATABASE_URL") - .unwrap_or("postgres://postgres:postgres@localhost/coprocessor".to_owned()), + database_url: Some(DatabaseURL::default()), database_pool_size: 10, verify_proof_resp_db_channel: "event_zkpok_computed".to_owned(), add_ciphertexts_db_channel: "event_ciphertexts_uploaded".to_owned(), diff --git a/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs b/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs index 1d8405188..f79b3242f 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs @@ -35,9 +35,11 @@ impl + Clone + 'static> TransactionSender

{ conf: ConfigSettings, gas: Option, ) -> anyhow::Result { + let database_url = conf.database_url.to_owned().unwrap_or_default(); + let db_pool = sqlx::postgres::PgPoolOptions::new() .max_connections(conf.database_pool_size) - .connect(&conf.database_url) + .connect(database_url.as_str()) .await?; let operations: Vec>> = vec![ @@ -81,7 +83,6 @@ impl + Clone + 'static> TransactionSender

{ pub async fn run(&self) -> anyhow::Result<()> { info!( - conf = ?self.conf, input_verification_address = %self.input_verification_address, ciphertext_commits_address = %self.ciphertext_commits_address, multichain_acl_address = %self.multichain_acl_address, diff --git a/coprocessor/fhevm-engine/transaction-sender/tests/common.rs b/coprocessor/fhevm-engine/transaction-sender/tests/common.rs index 1392c08f7..d45f8e70d 100644 --- a/coprocessor/fhevm-engine/transaction-sender/tests/common.rs +++ b/coprocessor/fhevm-engine/transaction-sender/tests/common.rs @@ -78,10 +78,11 @@ impl TestEnvironment { .with_max_level(Level::DEBUG) .with_test_writer() .try_init(); + let database_url = conf.database_url.to_owned().unwrap_or_default(); let db_pool = PgPoolOptions::new() .max_connections(1) - .connect(&conf.database_url) + .connect(database_url.as_str()) .await?; Self::truncate_tables( diff --git a/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs b/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs index 8a170ce28..ad7a53af7 100644 --- a/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs +++ b/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs @@ -1,6 +1,6 @@ use clap::{command, Parser}; use fhevm_engine_common::telemetry::{self, MetricsConfig}; -use fhevm_engine_common::{healthz_server::HttpServer, metrics_server}; +use fhevm_engine_common::{healthz_server::HttpServer, metrics_server, utils::DatabaseURL}; use humantime::parse_duration; use std::{sync::Arc, time::Duration}; use tokio::{join, task}; @@ -41,7 +41,7 @@ pub struct Args { /// Postgres database url. If unspecified DATABASE_URL environment variable /// is used #[arg(long)] - pub database_url: Option, + pub database_url: Option, /// Number of zkproof workers to process proofs in parallel #[arg(long, default_value_t = 8)] @@ -90,10 +90,7 @@ async fn main() { .with_max_level(args.log_level) .init(); - let database_url = args - .database_url - .clone() - .unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined")); + let database_url = args.database_url.clone().unwrap_or_default(); let conf = zkproof_worker::Config { database_url, diff --git a/coprocessor/fhevm-engine/zkproof-worker/src/lib.rs b/coprocessor/fhevm-engine/zkproof-worker/src/lib.rs index 1045214bd..d3cf39e3e 100644 --- a/coprocessor/fhevm-engine/zkproof-worker/src/lib.rs +++ b/coprocessor/fhevm-engine/zkproof-worker/src/lib.rs @@ -5,6 +5,7 @@ mod tests; pub mod verifier; use std::{ + fmt::{self, Display}, io, sync::{LazyLock, OnceLock}, time::Duration, @@ -14,6 +15,7 @@ use fhevm_engine_common::{ pg_pool::ServiceError, telemetry::{register_histogram, MetricsConfig}, types::FhevmError, + utils::DatabaseURL, }; use prometheus::Histogram; use thiserror::Error; @@ -77,7 +79,7 @@ impl From for ServiceError { #[derive(Default, Debug, Clone)] pub struct Config { - pub database_url: String, + pub database_url: DatabaseURL, pub listen_database_channel: String, pub notify_database_channel: String, pub pg_pool_connections: u32, @@ -96,3 +98,19 @@ pub static ZKVERIFY_OP_LATENCY_HISTOGRAM: LazyLock = LazyLock::new(|| "ZK verification latencies in seconds", ) }); +impl Display for Config { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Config {{ database_url: {}, listen_database_channel: {}, notify_database_channel: {}, pg_pool_connections: {}, pg_polling_interval: {}, pg_timeout: {:?}, pg_auto_explain_with_min_duration: {:?}, worker_thread_count: {} }}", + self.database_url, + self.listen_database_channel, + self.notify_database_channel, + self.pg_pool_connections, + self.pg_polling_interval, + self.pg_timeout, + self.pg_auto_explain_with_min_duration, + self.worker_thread_count + ) + } +} diff --git a/coprocessor/fhevm-engine/zkproof-worker/src/tests/utils.rs b/coprocessor/fhevm-engine/zkproof-worker/src/tests/utils.rs index 57fc6e54e..5fee67a34 100644 --- a/coprocessor/fhevm-engine/zkproof-worker/src/tests/utils.rs +++ b/coprocessor/fhevm-engine/zkproof-worker/src/tests/utils.rs @@ -15,7 +15,7 @@ pub async fn setup() -> anyhow::Result<(PostgresPoolManager, DBInstance)> { .expect("valid db instance"); let conf = crate::Config { - database_url: test_instance.db_url().to_owned(), + database_url: test_instance.db_url.clone(), listen_database_channel: "fhevm".to_string(), notify_database_channel: "notify".to_string(), pg_pool_connections: 10, diff --git a/coprocessor/fhevm-engine/zkproof-worker/src/verifier.rs b/coprocessor/fhevm-engine/zkproof-worker/src/verifier.rs index 8ed3ffe29..6b572dabc 100644 --- a/coprocessor/fhevm-engine/zkproof-worker/src/verifier.rs +++ b/coprocessor/fhevm-engine/zkproof-worker/src/verifier.rs @@ -125,7 +125,7 @@ pub async fn execute_verify_proofs_loop( conf: Config, last_active_at: Arc>, ) -> Result<(), ExecutionError> { - info!(conf = ?conf, "Starting with config"); + info!(conf = %conf, "Starting with config"); // Tenants key cache is shared amongst all workers let tenant_key_cache = Arc::new(RwLock::new(LruCache::new(