Skip to content

Commit 3a03ea4

Browse files
committed
chore(coprocessor): use DatabaseURL across all services
1 parent c82a658 commit 3a03ea4

File tree

27 files changed

+68
-90
lines changed

27 files changed

+68
-90
lines changed

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use alloy::providers::{ProviderBuilder, WsConnect};
44
use alloy::{primitives::Address, transports::http::reqwest::Url};
55
use clap::Parser;
6-
use fhevm_engine_common::{metrics_server, telemetry};
6+
use fhevm_engine_common::{metrics_server, telemetry, utils::DatabaseURL};
77
use gw_listener::aws_s3::AwsS3Client;
88
use gw_listener::chain_id_from_env;
99
use gw_listener::gw_listener::GatewayListener;
@@ -18,7 +18,7 @@ use tracing::{error, info, Level};
1818
#[command(version, about, long_about = None)]
1919
struct Conf {
2020
#[arg(long)]
21-
database_url: Option<String>,
21+
database_url: Option<DatabaseURL>,
2222

2323
#[arg(long, default_value_t = 16)]
2424
database_pool_size: u32,
@@ -105,18 +105,14 @@ async fn main() -> anyhow::Result<()> {
105105
.with_max_level(conf.log_level)
106106
.init();
107107

108-
info!(conf = ?conf, "Starting gw_listener");
109-
110108
if !conf.service_name.is_empty() {
111109
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
112110
error!(error = %err, "Failed to setup OTLP");
113111
}
114112
}
115113

116-
let database_url = conf
117-
.database_url
118-
.clone()
119-
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));
114+
info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries,
115+
retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway");
120116

121117
let provider = loop {
122118
match ProviderBuilder::new()
@@ -152,7 +148,7 @@ async fn main() -> anyhow::Result<()> {
152148
};
153149
let config = ConfigSettings {
154150
host_chain_id,
155-
database_url,
151+
database_url: conf.database_url.clone().unwrap_or_default(),
156152
database_pool_size: conf.database_pool_size,
157153
verify_proof_req_db_channel: conf.verify_proof_req_database_channel,
158154
gw_url: conf.gw_url,

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
8181
);
8282
let db_pool = PgPoolOptions::new()
8383
.max_connections(self.conf.database_pool_size)
84-
.connect(&self.conf.database_url)
84+
.connect(self.conf.database_url.as_str())
8585
.await?;
8686

8787
let input_verification_handle = {
@@ -549,7 +549,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
549549
// Check database connection
550550
let db_pool_result = PgPoolOptions::new()
551551
.max_connections(self.conf.database_pool_size)
552-
.connect(&self.conf.database_url)
552+
.connect(self.conf.database_url.as_str())
553553
.await;
554554

555555
match db_pool_result {

coprocessor/fhevm-engine/gw-listener/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use alloy::primitives::Uint;
22
use alloy::transports::http::reqwest::Url;
3+
use fhevm_engine_common::utils::DatabaseURL;
34
use std::time::Duration;
45

56
use tracing::error;
@@ -36,7 +37,7 @@ impl TryFrom<u8> for KeyType {
3637
#[derive(Clone, Debug)]
3738
pub struct ConfigSettings {
3839
pub host_chain_id: ChainId,
39-
pub database_url: String,
40+
pub database_url: DatabaseURL,
4041
pub database_pool_size: u32,
4142
pub verify_proof_req_db_channel: String,
4243

@@ -67,8 +68,7 @@ impl Default for ConfigSettings {
6768
fn default() -> Self {
6869
Self {
6970
host_chain_id: chain_id_from_env().unwrap_or(12345),
70-
database_url: std::env::var("DATABASE_URL")
71-
.unwrap_or("postgres://postgres:postgres@localhost/coprocessor".to_owned()),
71+
database_url: DatabaseURL::default(),
7272
database_pool_size: 16,
7373
verify_proof_req_db_channel: "event_zkpok_new_work".to_owned(),
7474
gw_url: "ws://127.0.0.1:8546".try_into().expect("Invalid URL"),

coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ impl TestEnvironment {
132132
.await
133133
.expect("valid db instance");
134134
eprintln!("New test database on {}", instance.db_url());
135-
conf.database_url = instance.db_url().to_owned();
135+
conf.database_url = instance.db_url.clone();
136136
_test_instance = Some(instance);
137137
};
138138
conf.error_sleep_initial_secs = 1;
139139
conf.error_sleep_max_secs = 1;
140140
let db_pool = PgPoolOptions::new()
141141
.max_connections(16)
142142
.acquire_timeout(Duration::from_secs(5))
143-
.connect(&conf.database_url)
143+
.connect(conf.database_url.as_str())
144144
.await?;
145145

146146
// Delete all proofs from the database.

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken;
2424

2525
use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer;
2626
use fhevm_engine_common::types::{BlockchainProvider, Handle};
27-
use fhevm_engine_common::utils::HeartBeat;
27+
use fhevm_engine_common::utils::{DatabaseURL, HeartBeat};
2828

2929
use crate::contracts::{AclContract, TfheContract};
3030
use crate::database::tfhe_event_propagate::{
@@ -58,7 +58,7 @@ pub struct Args {
5858
long,
5959
default_value = "postgresql://postgres:postgres@localhost:5432/coprocessor"
6060
)]
61-
pub database_url: String,
61+
pub database_url: DatabaseURL,
6262

6363
#[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)]
6464
pub start_at_block: Option<i64>,
@@ -970,7 +970,7 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
970970
let mut log_iter = InfiniteLogIter::new(&args);
971971
let chain_id = log_iter.get_chain_id().await?;
972972
info!(chain_id = chain_id, "Chain ID");
973-
if args.database_url.is_empty() {
973+
if args.database_url.as_str().is_empty() {
974974
error!("Database URL is required");
975975
panic!("Database URL is required");
976976
};

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use anyhow::Result;
55
use fhevm_engine_common::telemetry;
66
use fhevm_engine_common::types::AllowEvents;
77
use fhevm_engine_common::types::SupportedFheOperations;
8+
use fhevm_engine_common::utils::DatabaseURL;
89
use fhevm_engine_common::utils::{compact_hex, HeartBeat};
910
use sqlx::postgres::PgConnectOptions;
1011
use sqlx::postgres::PgPoolOptions;
@@ -70,7 +71,7 @@ pub fn retry_on_sqlx_error(err: &SqlxError, retry_count: &mut usize) -> bool {
7071

7172
// A pool of connection with some cached information and automatic reconnection
7273
pub struct Database {
73-
url: String,
74+
url: DatabaseURL,
7475
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
7576
pub tenant_id: TenantId,
7677
pub chain_id: ChainId,
@@ -90,7 +91,7 @@ pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;
9091

9192
impl Database {
9293
pub async fn new(
93-
url: &str,
94+
url: &DatabaseURL,
9495
coprocessor_api_key: &CoprocessorApiKey,
9596
bucket_cache_size: u16,
9697
) -> Result<Self> {
@@ -105,7 +106,7 @@ impl Database {
105106
.into(),
106107
));
107108
Ok(Database {
108-
url: url.into(),
109+
url: url.clone(),
109110
tenant_id,
110111
chain_id,
111112
pool: Arc::new(RwLock::new(pool)),
@@ -114,7 +115,7 @@ impl Database {
114115
})
115116
}
116117

117-
async fn new_pool(url: &str) -> PgPool {
118+
async fn new_pool(url: &DatabaseURL) -> PgPool {
118119
let options: PgConnectOptions = url.parse().expect("bad url");
119120
let options = options.options([
120121
("statement_timeout", "10000"), // 5 seconds

coprocessor/fhevm-engine/host-listener/tests/integration_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async fn setup(node_chain_id: Option<u64>) -> Result<Setup, anyhow::Error> {
217217
initial_block_time: 1,
218218
acl_contract_address: acl_contract.address().to_string(),
219219
tfhe_contract_address: tfhe_contract.address().to_string(),
220-
database_url: test_instance.db_url().to_string(),
220+
database_url: test_instance.db_url.clone(),
221221
coprocessor_api_key: Some(coprocessor_api_key),
222222
start_at_block: None,
223223
end_at_block: None,

coprocessor/fhevm-engine/sns-worker/src/lib.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,6 @@ pub struct DBConfig {
7575
pub lifo: bool,
7676
}
7777

78-
impl std::fmt::Debug for DBConfig {
79-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80-
// Custom debug impl to avoid printing sensitive information
81-
write!(
82-
f,
83-
"db_listen_channel: {:?}, db_notify_channel: {}, db_batch_limit: {}, db_gc_batch_limit: {}, db_polling_interval: {}, db_cleanup_interval: {:?}, db_max_connections: {}, db_timeout: {:?}, lifo: {}",
84-
self.listen_channels, self.notify_channel, self.batch_limit, self.gc_batch_limit, self.polling_interval, self.cleanup_interval, self.max_connections, self.timeout, self.lifo
85-
)
86-
}
87-
}
88-
8978
#[derive(Clone, Default, Debug)]
9079
pub struct S3Config {
9180
pub bucket_ct128: String,
@@ -109,7 +98,7 @@ pub struct HealthCheckConfig {
10998
pub port: u16,
11099
}
111100

112-
#[derive(Clone, Debug)]
101+
#[derive(Clone)]
113102
pub struct Config {
114103
pub tenant_api_key: String,
115104
pub service_name: String,
@@ -510,7 +499,7 @@ pub async fn run_all(
510499
mpsc::channel::<UploadJob>(10 * config.s3.max_concurrent_uploads as usize);
511500

512501
let rayon_threads = rayon::current_num_threads();
513-
info!(config = ?config, rayon_threads, "Starting SNS worker");
502+
info!(config = %config, rayon_threads, "Starting SNS worker");
514503

515504
if !config.service_name.is_empty() {
516505
if let Err(err) = telemetry::setup_otlp(&config.service_name) {

coprocessor/fhevm-engine/sns-worker/src/tests/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use aws_config::BehaviorVersion;
99
use fhevm_engine_common::utils::{compact_hex, DatabaseURL};
1010
use serde::{Deserialize, Serialize};
1111
use serial_test::serial;
12-
use sqlx::Database;
1312
use std::{
1413
fs::File,
1514
io::{Read, Write},

coprocessor/fhevm-engine/stress-test-generator/data/json/minitest_002_erc20.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
"user_address": "0xa0534e99d86F081E8D3868A8C4732C8f65dfdB07",
1010
"scenario": [
1111
[
12-
20.0,
13-
120
12+
0.1,
13+
5
1414
]
1515
]
1616
}
17-
]
17+
]

0 commit comments

Comments
 (0)