Skip to content

Commit d6ffb71

Browse files
authored
chore(coprocessor): add more metrics for sns-worker (#1369)
Also, move code around to put metrics in one place.
1 parent 94ebc0f commit d6ffb71

File tree

10 files changed

+203
-30
lines changed

10 files changed

+203
-30
lines changed

charts/coprocessor/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: coprocessor
22
description: A helm chart to distribute and deploy Zama fhevm Co-Processor services
3-
version: 0.7.2
3+
version: 0.7.3
44
apiVersion: v2
55
keywords:
66
- fhevm

charts/coprocessor/values.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,10 +601,12 @@ snsWorker:
601601
- --cleanup-interval=120s
602602
- --liveness-threshold=70s
603603
- --lifo=false
604-
- --enable-compssion=true
604+
- --enable-compression=true
605605
- --schedule-policy=rayon_parallel
606606
- --service-name=sns-worker
607607
- --log-level=INFO
608+
# Only enable `gauge-update-interval-secs` for some of the workers to reduce DB load and not have duplicate metrics data for no reason.
609+
# --gauge-update-interval-secs=10
608610
# - --pg-auto-explain-with-min-duration="30s"
609611
# - --keys-file-path
610612

coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::metrics::{AWS_UPLOAD_FAILURE_COUNTER, AWS_UPLOAD_SUCCESS_COUNTER};
12
use crate::{
23
BigCiphertext, Ciphertext128Format, Config, ExecutionError, HandleItem, S3Config, UploadJob,
34
};
@@ -166,16 +167,19 @@ async fn run_uploader_loop(
166167
let h = tokio::spawn(async move {
167168
let s = item.otel.child_span("upload_s3");
168169
match upload_ciphertexts(trx, item, &client, &conf).instrument(error_span!("upload_s3")).await {
169-
Ok(()) => telemetry::end_span(s),
170+
Ok(()) => {
171+
telemetry::end_span(s);
172+
AWS_UPLOAD_SUCCESS_COUNTER.inc();
173+
}
170174
Err(err) => {
171175
if let ExecutionError::S3TransientError(_) = err {
172176
ready_flag.store(false, Ordering::Release);
173177
info!(error = %err, "S3 setup is not ready, due to transient error");
174178
} else {
175179
error!(error = %err, "Failed to upload ciphertexts");
176180
}
177-
178181
telemetry::end_span_with_err(s, err.to_string());
182+
AWS_UPLOAD_FAILURE_COUNTER.inc();
179183
}
180184
}
181185
drop(permit);

coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use sns_worker::{Config, DBConfig, HealthCheckConfig, S3Config, S3RetryPolicy};
1+
use sns_worker::{Config, DBConfig, HealthCheckConfig, S3Config, S3RetryPolicy, SNSMetricsConfig};
22

33
use tokio::signal::unix;
44
use tokio_util::sync::CancellationToken;
@@ -21,7 +21,10 @@ fn construct_config() -> Config {
2121
Config {
2222
tenant_api_key: args.tenant_api_key,
2323
service_name: args.service_name,
24-
metrics_addr: args.metrics_addr,
24+
metrics: SNSMetricsConfig {
25+
addr: args.metrics_addr,
26+
gauge_update_interval_secs: args.gauge_update_interval_secs,
27+
},
2528
db: DBConfig {
2629
url: db_url,
2730
listen_channels: args.pg_listen_channels,

coprocessor/fhevm-engine/sns-worker/src/bin/utils/daemon_cli.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use clap::{command, Parser};
44
use fhevm_engine_common::telemetry::MetricsConfig;
55
use fhevm_engine_common::utils::DatabaseURL;
66
use humantime::parse_duration;
7-
use sns_worker::{SchedulePolicy, SNS_LATENCY_OP_HISTOGRAM_CONF};
7+
use sns_worker::metrics::SNS_LATENCY_OP_HISTOGRAM_CONF;
8+
use sns_worker::SchedulePolicy;
89
use tracing::Level;
910

1011
#[derive(Parser, Debug, Clone)]
@@ -128,6 +129,9 @@ pub struct Args {
128129
/// Prometheus metrics: coprocessor_sns_op_latency_seconds
129130
#[arg(long, default_value = "0.1:10.0:0.1", value_parser = clap::value_parser!(MetricsConfig))]
130131
pub metric_sns_op_latency: MetricsConfig,
132+
133+
#[arg(long, value_parser = clap::value_parser!(u32).range(1..))]
134+
pub gauge_update_interval_secs: Option<u32>,
131135
}
132136

133137
pub fn parse_args() -> Args {

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::aws_upload::check_is_ready;
22
use crate::keyset::fetch_keyset;
3+
use crate::metrics::SNS_LATENCY_OP_HISTOGRAM;
4+
use crate::metrics::TASK_EXECUTE_FAILURE_COUNTER;
5+
use crate::metrics::TASK_EXECUTE_SUCCESS_COUNTER;
36
use crate::squash_noise::SquashNoiseCiphertext;
47
use crate::BigCiphertext;
58
use crate::Ciphertext128Format;
@@ -8,7 +11,6 @@ use crate::InternalEvents;
811
use crate::KeySet;
912
use crate::SchedulePolicy;
1013
use crate::UploadJob;
11-
use crate::SNS_LATENCY_OP_HISTOGRAM;
1214
use crate::{Config, ExecutionError};
1315
use aws_sdk_s3::Client;
1416
use fhevm_engine_common::healthz_server::{HealthCheckService, HealthStatus, Version};
@@ -238,7 +240,15 @@ pub(crate) async fn run_loop(
238240
continue;
239241
};
240242

241-
let maybe_remaining = fetch_and_execute_sns_tasks(&pool, &tx, keys, &conf, &token).await?;
243+
let (maybe_remaining, _tasks_processed) =
244+
fetch_and_execute_sns_tasks(&pool, &tx, keys, &conf, &token)
245+
.await
246+
.inspect(|(_, tasks_processed)| {
247+
TASK_EXECUTE_SUCCESS_COUNTER.inc_by(*tasks_processed as u64);
248+
})
249+
.inspect_err(|_| {
250+
TASK_EXECUTE_FAILURE_COUNTER.inc();
251+
})?;
242252
if maybe_remaining {
243253
if token.is_cancelled() {
244254
return Ok(());
@@ -317,13 +327,14 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
317327
}
318328

319329
/// Fetch and process SnS tasks from the database.
330+
/// Returns (maybe_remaining, number_of_tasks_processed) on success.
320331
async fn fetch_and_execute_sns_tasks(
321332
pool: &PgPool,
322333
tx: &Sender<UploadJob>,
323334
keys: &KeySet,
324335
conf: &Config,
325336
token: &CancellationToken,
326-
) -> Result<bool, ExecutionError> {
337+
) -> Result<(bool, usize), ExecutionError> {
327338
let mut db_txn = match pool.begin().await {
328339
Ok(txn) => txn,
329340
Err(err) => {
@@ -341,8 +352,10 @@ async fn fetch_and_execute_sns_tasks(
341352
let trx = &mut db_txn;
342353

343354
let mut maybe_remaining = false;
355+
let tasks_processed;
344356
if let Some(mut tasks) = query_sns_tasks(trx, conf.db.batch_limit, order).await? {
345357
maybe_remaining = conf.db.batch_limit as usize == tasks.len();
358+
tasks_processed = tasks.len();
346359

347360
let t = telemetry::tracer("batch_execution", &None);
348361
t.set_attribute("count", tasks.len().to_string());
@@ -375,10 +388,11 @@ async fn fetch_and_execute_sns_tasks(
375388
}
376389
}
377390
} else {
391+
tasks_processed = 0;
378392
db_txn.rollback().await?;
379393
}
380394

381-
Ok(maybe_remaining)
395+
Ok((maybe_remaining, tasks_processed))
382396
}
383397

384398
/// Queries the database for a fixed number of tasks.

coprocessor/fhevm-engine/sns-worker/src/lib.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ mod executor;
33
mod keyset;
44
mod squash_noise;
55

6+
pub mod metrics;
7+
68
#[cfg(test)]
79
mod tests;
810

@@ -21,7 +23,6 @@ use fhevm_engine_common::{
2123
metrics_server,
2224
pg_pool::{PostgresPoolManager, ServiceError},
2325
telemetry::{self, OtelTracer},
24-
telemetry::{register_histogram, MetricsConfig},
2526
types::FhevmError,
2627
utils::{to_hex, DatabaseURL},
2728
};
@@ -43,11 +44,9 @@ use tracing::{error, info, Level};
4344
use crate::{
4445
aws_upload::{check_is_ready, spawn_resubmit_task, spawn_uploader},
4546
executor::SwitchNSquashService,
47+
metrics::spawn_gauge_update_routine,
4648
};
4749

48-
use prometheus::Histogram;
49-
use std::sync::{LazyLock, OnceLock};
50-
5150
pub const UPLOAD_QUEUE_SIZE: usize = 20;
5251
pub const SAFE_SER_LIMIT: u64 = 1024 * 1024 * 66;
5352
pub type InternalEvents = Option<tokio::sync::mpsc::Sender<&'static str>>;
@@ -75,6 +74,12 @@ pub struct DBConfig {
7574
pub lifo: bool,
7675
}
7776

77+
#[derive(Clone, Default, Debug)]
78+
pub struct SNSMetricsConfig {
79+
pub addr: Option<String>,
80+
pub gauge_update_interval_secs: Option<u32>,
81+
}
82+
7883
#[derive(Clone, Default, Debug)]
7984
pub struct S3Config {
8085
pub bucket_ct128: String,
@@ -106,7 +111,7 @@ pub struct Config {
106111
pub s3: S3Config,
107112
pub log_level: Level,
108113
pub health_checks: HealthCheckConfig,
109-
pub metrics_addr: Option<String>,
114+
pub metrics: SNSMetricsConfig,
110115
pub enable_compression: bool,
111116
pub schedule_policy: SchedulePolicy,
112117
pub pg_auto_explain_with_min_duration: Option<Duration>,
@@ -390,9 +395,6 @@ pub async fn run_computation_loop(
390395
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
391396
let port = conf.health_checks.port;
392397

393-
// Start metrics server
394-
metrics_server::spawn(conf.metrics_addr.clone(), token.child_token());
395-
396398
let service = Arc::new(
397399
SwitchNSquashService::create(
398400
pool_mngr,
@@ -532,6 +534,21 @@ pub async fn run_all(
532534

533535
let pg_mngr = pool_mngr.clone();
534536

537+
// Start metrics server
538+
metrics_server::spawn(conf.metrics.addr.clone(), token.child_token());
539+
540+
// Start gauge update routine.
541+
if let Some(interval_secs) = conf.metrics.gauge_update_interval_secs {
542+
info!(
543+
interval_secs = interval_secs,
544+
"Starting gauge update routine"
545+
);
546+
spawn_gauge_update_routine(
547+
Duration::from_secs(interval_secs.into()),
548+
pg_mngr.pool().clone(),
549+
);
550+
}
551+
535552
// Spawns a task to handle S3 uploads
536553
spawn(async move {
537554
if let Err(err) = run_uploader_loop(&pg_mngr, &conf, jobs_rx, tx, s3, is_ready).await {
@@ -552,12 +569,3 @@ pub async fn run_all(
552569

553570
Ok(())
554571
}
555-
556-
pub static SNS_LATENCY_OP_HISTOGRAM_CONF: OnceLock<MetricsConfig> = OnceLock::new();
557-
pub static SNS_LATENCY_OP_HISTOGRAM: LazyLock<Histogram> = LazyLock::new(|| {
558-
register_histogram(
559-
SNS_LATENCY_OP_HISTOGRAM_CONF.get(),
560-
"coprocessor_sns_op_latency_seconds",
561-
"Squash_noise computation latencies in seconds",
562-
)
563-
});
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::sync::{LazyLock, OnceLock};
2+
3+
use fhevm_engine_common::telemetry::{register_histogram, MetricsConfig};
4+
use prometheus::{register_int_counter, IntCounter};
5+
use prometheus::{register_int_gauge, Histogram, IntGauge};
6+
use sqlx::PgPool;
7+
use tokio::task::JoinHandle;
8+
use tokio::time::sleep;
9+
use tracing::{error, info};
10+
11+
pub static SNS_LATENCY_OP_HISTOGRAM_CONF: OnceLock<MetricsConfig> = OnceLock::new();
12+
pub(crate) static SNS_LATENCY_OP_HISTOGRAM: LazyLock<Histogram> = LazyLock::new(|| {
13+
register_histogram(
14+
SNS_LATENCY_OP_HISTOGRAM_CONF.get(),
15+
"coprocessor_sns_op_latency_seconds",
16+
"Squash_noise computation latencies in seconds",
17+
)
18+
});
19+
20+
pub(crate) static TASK_EXECUTE_SUCCESS_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
21+
register_int_counter!(
22+
"coprocessor_sns_worker_task_execute_success_counter",
23+
"Number of successful task execute operations in sns-worker (including persistence to DB)"
24+
)
25+
.unwrap()
26+
});
27+
28+
pub(crate) static TASK_EXECUTE_FAILURE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
29+
register_int_counter!(
30+
"coprocessor_sns_worker_task_execute_failure_counter",
31+
"Number of failed task execute operations in sns-worker (including persistence to DB)"
32+
)
33+
.unwrap()
34+
});
35+
36+
pub(crate) static AWS_UPLOAD_SUCCESS_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
37+
register_int_counter!(
38+
"coprocessor_sns_worker_aws_upload_success_counter",
39+
"Number of successful AWS uploads in sns-worker"
40+
)
41+
.unwrap()
42+
});
43+
44+
pub(crate) static AWS_UPLOAD_FAILURE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
45+
register_int_counter!(
46+
"coprocessor_sns_worker_aws_upload_failure_counter",
47+
"Number of failed AWS uploads in sns-worker"
48+
)
49+
.unwrap()
50+
});
51+
52+
pub(crate) static UNCOMPLETE_TASKS: LazyLock<IntGauge> = LazyLock::new(|| {
53+
register_int_gauge!(
54+
"coprocessor_sns_worker_uncomplete_tasks_gauge",
55+
"Number of uncomplete tasks in sns-worker"
56+
)
57+
.unwrap()
58+
});
59+
60+
pub(crate) static UNCOMPLETE_AWS_UPLOADS: LazyLock<IntGauge> = LazyLock::new(|| {
61+
register_int_gauge!(
62+
"coprocessor_sns_worker_uncomplete_aws_uploads_gauge",
63+
"Number of uncomplete AWS uploads in sns-worker"
64+
)
65+
.unwrap()
66+
});
67+
68+
pub fn spawn_gauge_update_routine(period: std::time::Duration, db_pool: PgPool) -> JoinHandle<()> {
69+
tokio::spawn(async move {
70+
loop {
71+
match sqlx::query_scalar(
72+
"SELECT COUNT(*) FROM pbs_computations WHERE is_completed = FALSE",
73+
)
74+
.fetch_one(&db_pool)
75+
.await
76+
{
77+
Ok(count) => {
78+
info!(uncomplete_tasks = %count, "Fetched uncomplete tasks count");
79+
UNCOMPLETE_TASKS.set(count);
80+
}
81+
Err(e) => {
82+
error!(error = %e, "Failed to fetch uncomplete tasks count");
83+
}
84+
}
85+
86+
match sqlx::query_scalar(
87+
"SELECT COUNT(*) FROM ciphertext_digest WHERE (ciphertext128 IS NULL OR ciphertext IS NULL)",
88+
)
89+
.fetch_one(&db_pool)
90+
.await
91+
{
92+
Ok(count) => {
93+
info!(uncomplete_aws_uploads = %count, "Fetched uncomplete AWS uploads count");
94+
UNCOMPLETE_AWS_UPLOADS.set(count);
95+
}
96+
Err(e) => {
97+
error!(error = %e, "Failed to fetch uncomplete AWS uploads count");
98+
}
99+
}
100+
101+
sleep(period).await;
102+
}
103+
})
104+
}

coprocessor/fhevm-engine/sns-worker/src/tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,6 @@ fn build_test_config(url: DatabaseURL, enable_compression: bool) -> Config {
755755
enable_compression,
756756
schedule_policy,
757757
pg_auto_explain_with_min_duration: Some(Duration::from_secs(1)),
758-
metrics_addr: None,
758+
metrics: Default::default(),
759759
}
760760
}

0 commit comments

Comments
 (0)