Skip to content
Merged
126 changes: 126 additions & 0 deletions coprocessor/fhevm-engine/fhevm-engine-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use std::time::Duration;
use serde::{de::DeserializeOwned, Serialize};
use tfhe::{named::Named, prelude::ParameterSetConformant, Unversionize, Versionize};

use sqlx::postgres::PgConnectOptions;
use std::fmt;
use std::str::FromStr;

use crate::types::FhevmError;

pub const SAFE_SER_DESER_LIMIT: u64 = 1024 * 1024 * 16;
Expand Down Expand Up @@ -110,3 +114,125 @@ impl Default for HeartBeat {
Self::new()
}
}
/// Simple wrapper around Database URL string to provide
/// url constraints and masking functionality.
#[derive(Clone)]
pub struct DatabaseURL(String);

impl From<&str> for DatabaseURL {
fn from(s: &str) -> Self {
let url = s.to_owned();
let app_name = Self::default_app_name();
Self::new_with_app_name(&url, &app_name)
}
}
impl From<String> for DatabaseURL {
fn from(s: String) -> Self {
let url = s.to_owned();
let app_name = Self::default_app_name();
Self::new_with_app_name(&url, &app_name)
}
}

impl Default for DatabaseURL {
fn default() -> Self {
let url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://postgres:postgres@localhost:5432/coprocessor".to_owned());

let app_name = Self::default_app_name();
Self::new_with_app_name(&url, &app_name)
}
}

impl DatabaseURL {
/// Create a new DatabaseURL, appending application_name if not present
/// If the base URL already contains an application_name, it will be preserved.
///
/// application_name is useful for identifying the source of DB conns
pub fn new_with_app_name(base: &str, app_name: &str) -> Self {
let app_name = app_name.trim();
if app_name.is_empty() {
return Self(base.to_owned());
}

// Append application_name if not present
let mut url = base.to_owned();
if !url.contains("application_name=") {
if url.contains('?') {
url.push_str(&format!("&application_name={}", app_name));
} else {
url.push_str(&format!("?application_name={}", app_name));
}
}
let url: Self = Self(url);
let _ = url.parse().expect("DatabaseURL should be valid");
url
}

/// Get default app name from the executable name
fn default_app_name() -> String {
std::env::args()
.next()
.and_then(|path| {
std::path::Path::new(&path)
.file_name()
.map(|s| s.to_string_lossy().into_owned())
})
.unwrap_or_default()
}

pub fn as_str(&self) -> &str {
self.0.as_str()
}

pub fn into_inner(self) -> String {
self.0
}

fn mask_password(options: &PgConnectOptions) -> String {
let new_url = format!(
"postgres://{}:{}@{}:{}/{}?application_name={}",
options.get_username(),
"*****",
options.get_host(),
options.get_port(),
options.get_database().unwrap_or_default(),
options.get_application_name().unwrap_or_default()
);
new_url
}

pub fn parse(&self) -> Result<PgConnectOptions, sqlx::Error> {
PgConnectOptions::from_str(self.as_str())
}
}

impl fmt::Display for DatabaseURL {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match PgConnectOptions::from_str(self.as_str()) {
Ok(options) => {
write!(f, "{:?}", Self::mask_password(&options))
}
Err(_) => write!(f, "Invalid DatabaseURL"),
}
}
}

impl fmt::Debug for DatabaseURL {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match PgConnectOptions::from_str(self.as_str()) {
Ok(options) => {
write!(f, "{:?}", options.password("*****"))
}
Err(_) => write!(f, "Invalid DatabaseURL"),
}
}
}
impl FromStr for DatabaseURL {
type Err = sqlx::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let _ = PgConnectOptions::from_str(s)?;
Ok(Self(s.to_owned()))
}
}
38 changes: 38 additions & 0 deletions coprocessor/fhevm-engine/fhevm-engine-common/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use fhevm_engine_common::utils::DatabaseURL;

#[tokio::test]
async fn mask_database_url() {
let db_url: DatabaseURL = "postgres://postgres:mypassword@localhost:5432/coprocessor".into();

let debug_fmt = format!("{:?}", db_url);
assert!(!debug_fmt.contains("mypassword"));

let display_fmt = format!("{}", db_url);
assert!(!display_fmt.contains("mypassword"));
println!("DatabaseURL: {}", db_url);

let db_url: DatabaseURL = DatabaseURL::new_with_app_name(
"postgres://user:secret@dbhost:5432/mydb?sslmode=require",
"tfhe-worker",
);

assert_eq!(
db_url.as_str(),
"postgres://user:secret@dbhost:5432/mydb?sslmode=require&application_name=tfhe-worker"
);

let db_url: DatabaseURL =
DatabaseURL::new_with_app_name("postgres://user:secret@dbhost:5432/mydb", "tfhe-worker");

assert_eq!(
db_url.as_str(),
"postgres://user:secret@dbhost:5432/mydb?application_name=tfhe-worker"
);

println!("DatabaseURL: {}", db_url);

let db_url: DatabaseURL =
DatabaseURL::new_with_app_name("postgres://user:secret@dbhost:5432/mydb", " ");

assert_eq!(db_url.as_str(), "postgres://user:secret@dbhost:5432/mydb");
}
14 changes: 5 additions & 9 deletions coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use alloy::providers::{ProviderBuilder, WsConnect};
use alloy::{primitives::Address, transports::http::reqwest::Url};
use clap::Parser;
use fhevm_engine_common::{metrics_server, telemetry};
use fhevm_engine_common::{metrics_server, telemetry, utils::DatabaseURL};
use gw_listener::aws_s3::AwsS3Client;
use gw_listener::chain_id_from_env;
use gw_listener::gw_listener::GatewayListener;
Expand All @@ -18,7 +18,7 @@ use tracing::{error, info, Level};
#[command(version, about, long_about = None)]
struct Conf {
#[arg(long)]
database_url: Option<String>,
database_url: Option<DatabaseURL>,

#[arg(long, default_value_t = 16)]
database_pool_size: u32,
Expand Down Expand Up @@ -105,18 +105,14 @@ async fn main() -> anyhow::Result<()> {
.with_max_level(conf.log_level)
.init();

info!(conf = ?conf, "Starting gw_listener");

if !conf.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
error!(error = %err, "Failed to setup OTLP");
}
}

let database_url = conf
.database_url
.clone()
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));
info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries,
retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway");

let provider = loop {
match ProviderBuilder::new()
Expand Down Expand Up @@ -152,7 +148,7 @@ async fn main() -> anyhow::Result<()> {
};
let config = ConfigSettings {
host_chain_id,
database_url,
database_url: conf.database_url.clone().unwrap_or_default(),
database_pool_size: conf.database_pool_size,
verify_proof_req_db_channel: conf.verify_proof_req_database_channel,
gw_url: conf.gw_url,
Expand Down
4 changes: 2 additions & 2 deletions coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
);
let db_pool = PgPoolOptions::new()
.max_connections(self.conf.database_pool_size)
.connect(&self.conf.database_url)
.connect(self.conf.database_url.as_str())
.await?;

let input_verification_handle = {
Expand Down Expand Up @@ -549,7 +549,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
// Check database connection
let db_pool_result = PgPoolOptions::new()
.max_connections(self.conf.database_pool_size)
.connect(&self.conf.database_url)
.connect(self.conf.database_url.as_str())
.await;

match db_pool_result {
Expand Down
6 changes: 3 additions & 3 deletions coprocessor/fhevm-engine/gw-listener/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use alloy::primitives::Uint;
use alloy::transports::http::reqwest::Url;
use fhevm_engine_common::utils::DatabaseURL;
use std::time::Duration;

use tracing::error;
Expand Down Expand Up @@ -36,7 +37,7 @@ impl TryFrom<u8> for KeyType {
#[derive(Clone, Debug)]
pub struct ConfigSettings {
pub host_chain_id: ChainId,
pub database_url: String,
pub database_url: DatabaseURL,
pub database_pool_size: u32,
pub verify_proof_req_db_channel: String,

Expand Down Expand Up @@ -67,8 +68,7 @@ impl Default for ConfigSettings {
fn default() -> Self {
Self {
host_chain_id: chain_id_from_env().unwrap_or(12345),
database_url: std::env::var("DATABASE_URL")
.unwrap_or("postgres://postgres:postgres@localhost/coprocessor".to_owned()),
database_url: DatabaseURL::default(),
database_pool_size: 16,
verify_proof_req_db_channel: "event_zkpok_new_work".to_owned(),
gw_url: "ws://127.0.0.1:8546".try_into().expect("Invalid URL"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ impl TestEnvironment {
.await
.expect("valid db instance");
eprintln!("New test database on {}", instance.db_url());
conf.database_url = instance.db_url().to_owned();
conf.database_url = instance.db_url.clone();
_test_instance = Some(instance);
};
conf.error_sleep_initial_secs = 1;
conf.error_sleep_max_secs = 1;
let db_pool = PgPoolOptions::new()
.max_connections(16)
.acquire_timeout(Duration::from_secs(5))
.connect(&conf.database_url)
.connect(conf.database_url.as_str())
.await?;

// Delete all proofs from the database.
Expand Down
6 changes: 3 additions & 3 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken;

use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer;
use fhevm_engine_common::types::{BlockchainProvider, Handle};
use fhevm_engine_common::utils::HeartBeat;
use fhevm_engine_common::utils::{DatabaseURL, HeartBeat};

use crate::contracts::{AclContract, TfheContract};
use crate::database::tfhe_event_propagate::{
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct Args {
long,
default_value = "postgresql://postgres:postgres@localhost:5432/coprocessor"
)]
pub database_url: String,
pub database_url: DatabaseURL,

#[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)]
pub start_at_block: Option<i64>,
Expand Down Expand Up @@ -970,7 +970,7 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
let mut log_iter = InfiniteLogIter::new(&args);
let chain_id = log_iter.get_chain_id().await?;
info!(chain_id = chain_id, "Chain ID");
if args.database_url.is_empty() {
if args.database_url.as_str().is_empty() {
error!("Database URL is required");
panic!("Database URL is required");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Result;
use fhevm_engine_common::telemetry;
use fhevm_engine_common::types::AllowEvents;
use fhevm_engine_common::types::SupportedFheOperations;
use fhevm_engine_common::utils::DatabaseURL;
use fhevm_engine_common::utils::{compact_hex, HeartBeat};
use sqlx::postgres::PgConnectOptions;
use sqlx::postgres::PgPoolOptions;
Expand Down Expand Up @@ -70,7 +71,7 @@ pub fn retry_on_sqlx_error(err: &SqlxError, retry_count: &mut usize) -> bool {

// A pool of connection with some cached information and automatic reconnection
pub struct Database {
url: String,
url: DatabaseURL,
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
pub tenant_id: TenantId,
pub chain_id: ChainId,
Expand All @@ -90,7 +91,7 @@ pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;

impl Database {
pub async fn new(
url: &str,
url: &DatabaseURL,
coprocessor_api_key: &CoprocessorApiKey,
bucket_cache_size: u16,
) -> Result<Self> {
Expand All @@ -105,7 +106,7 @@ impl Database {
.into(),
));
Ok(Database {
url: url.into(),
url: url.clone(),
tenant_id,
chain_id,
pool: Arc::new(RwLock::new(pool)),
Expand All @@ -114,7 +115,7 @@ impl Database {
})
}

async fn new_pool(url: &str) -> PgPool {
async fn new_pool(url: &DatabaseURL) -> PgPool {
let options: PgConnectOptions = url.parse().expect("bad url");
let options = options.options([
("statement_timeout", "10000"), // 5 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async fn setup(node_chain_id: Option<u64>) -> Result<Setup, anyhow::Error> {
initial_block_time: 1,
acl_contract_address: acl_contract.address().to_string(),
tfhe_contract_address: tfhe_contract.address().to_string(),
database_url: test_instance.db_url().to_string(),
database_url: test_instance.db_url.clone(),
coprocessor_api_key: Some(coprocessor_api_key),
start_at_block: None,
end_at_block: None,
Expand Down
5 changes: 1 addition & 4 deletions coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ fn handle_sigint(token: CancellationToken) {
fn construct_config() -> Config {
let args: utils::daemon_cli::Args = utils::daemon_cli::parse_args();

let db_url = args
.database_url
.clone()
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));
let db_url = args.database_url.clone().unwrap_or_default();

Config {
tenant_api_key: args.tenant_api_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use clap::{command, Parser};
use fhevm_engine_common::telemetry::MetricsConfig;
use fhevm_engine_common::utils::DatabaseURL;
use humantime::parse_duration;
use sns_worker::{SchedulePolicy, SNS_LATENCY_OP_HISTOGRAM_CONF};
use tracing::Level;
Expand Down Expand Up @@ -48,7 +49,7 @@ pub struct Args {
/// Postgres database url. If unspecified DATABASE_URL environment variable
/// is used
#[arg(long)]
pub database_url: Option<String>,
pub database_url: Option<DatabaseURL>,

/// KeySet file. If unspecified the the keys are read from the database
#[arg(long)]
Expand Down
Loading
Loading