diff --git a/Cargo.lock b/Cargo.lock index fdd82082e..276cb6174 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -844,7 +844,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.0", + "windows-targets 0.52.6", ] [[package]] @@ -1005,9 +1005,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "core_affinity" @@ -1053,6 +1053,25 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.18" @@ -1944,7 +1963,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -2119,9 +2138,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libm" @@ -2380,6 +2399,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3013,6 +3041,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "rc2" version = "0.8.1" @@ -3659,6 +3707,7 @@ dependencies = [ "sha2", "shlex", "strip-ansi-escapes", + "sysinfo", "syslog", "tar", "temp-env", @@ -4090,6 +4139,20 @@ dependencies = [ "syn 2.0.91", ] +[[package]] +name = "sysinfo" +version = "0.33.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fc858248ea01b66f19d8e8a6d55f41deaf91e9d495246fd01368d99935c6c01" +dependencies = [ + "core-foundation-sys", + "libc", + "memchr", + "ntapi", + "rayon", + "windows", +] + [[package]] name = "syslog" version = "6.1.0" @@ -5059,13 +5122,66 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" +dependencies = [ + "windows-core 0.57.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", +] + +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.6", ] [[package]] @@ -5083,7 +5199,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.6", ] [[package]] @@ -5103,17 +5219,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -5124,9 +5241,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -5136,9 +5253,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -5148,9 +5265,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -5160,9 +5283,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -5172,9 +5295,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -5184,9 +5307,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -5196,9 +5319,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" diff --git a/Cargo.toml b/Cargo.toml index 0ca094a58..3067bdc5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,6 +139,7 @@ rouille = { version = "3.6", optional = true, default-features = false, features "ssl", ] } syslog = { version = "6", optional = true } +sysinfo = { version = "0.33", optional = true } tokio-openssl = { version = "0.6.5", optional = true } tokio-stream = { version = "0.1.17", optional = true } tokio-tungstenite = { version = "0.24", optional = true, features = [ @@ -245,6 +246,7 @@ dist-server = [ "reqwest", "rouille", "syslog", + "sysinfo", "tower-http", "tokio-openssl", "tokio-stream", diff --git a/src/bin/sccache-dist/main.rs b/src/bin/sccache-dist/main.rs index e5257dac2..a7311a9e8 100644 --- a/src/bin/sccache-dist/main.rs +++ b/src/bin/sccache-dist/main.rs @@ -8,7 +8,6 @@ use celery::prelude::*; use celery::protocol::MessageContentType; use futures::lock::Mutex; use futures::{pin_mut, AsyncReadExt, FutureExt, TryFutureExt}; -use sccache::dist::http::{bincode_deserialize, bincode_serialize}; use std::collections::HashMap; use std::env; @@ -33,9 +32,11 @@ use sccache::{ INSECURE_DIST_CLIENT_TOKEN, }, dist::{ - self, BuilderIncoming, CompileCommand, NewJobRequest, NewJobResponse, RunJobRequest, - RunJobResponse, SchedulerService, SchedulerStatusResult, ServerDetails, ServerService, - ServerStatusResult, ServerToolchains, SubmitToolchainResult, Toolchain, + self, + http::{bincode_deserialize, bincode_serialize}, + BuilderIncoming, CompileCommand, JobStats, NewJobRequest, NewJobResponse, RunJobRequest, + RunJobResponse, SchedulerService, SchedulerStatus, ServerDetails, ServerService, + ServerStats, ServerStatus, ServerToolchains, SubmitToolchainResult, Toolchain, }, errors::*, util::daemonize, @@ -416,8 +417,12 @@ fn run(command: Command) -> Result<()> { let server = Arc::new(Server::new( server_id, - max_per_core_load, - occupancy, + dist::ServerStats { + num_cpus, + occupancy, + pre_fetch, + ..Default::default() + }, job_queue.clone(), builder, task_queue.clone(), @@ -576,20 +581,18 @@ async fn put_job_result( } #[derive(Clone, Debug)] -struct ServerStatus { - pub max_per_core_load: f64, - pub mtime: Instant, - pub num_cpus: usize, - pub num_jobs_pending: usize, - pub num_jobs_running: usize, +struct ServerInfo { + pub u_time: Instant, + pub info: ServerStats, + pub jobs: JobStats, } pub struct Scheduler { job_time_limit: u32, jobs_storage: Arc, - pending_jobs: Arc>>>, + jobs: Arc>>>, queue_name: String, - servers: Arc>>, + servers: Arc>>, task_queue: Arc, toolchains: Arc, } @@ -605,7 +608,7 @@ impl Scheduler { Self { job_time_limit, jobs_storage, - pending_jobs: Arc::new(Mutex::new(HashMap::new())), + jobs: Arc::new(Mutex::new(HashMap::new())), servers: Arc::new(Mutex::new(HashMap::new())), task_queue, queue_name: to_this_scheduler_queue, @@ -613,37 +616,51 @@ impl Scheduler { } } - fn prune_servers(servers: &mut HashMap) { + fn prune_servers(servers: &mut HashMap) { let now = Instant::now(); // Prune servers we haven't seen in 90s let timeout = Duration::from_secs(90); - servers.retain(|_, server| now.duration_since(server.mtime) <= timeout); + servers.retain(|_, server| now.duration_since(server.u_time) <= timeout); } } #[async_trait] impl SchedulerService for Scheduler { - async fn get_status(&self) -> Result { - let mut servers = HashMap::::new(); + async fn get_status(&self) -> Result { + let mut servers = vec![]; for (server_id, server) in self.servers.lock().await.iter() { - servers.insert( - server_id.clone(), - ServerStatusResult { - max_per_core_load: server.max_per_core_load, - num_cpus: server.num_cpus, - num_jobs_pending: server.num_jobs_pending, - num_jobs_running: server.num_jobs_running, - last_seen: server.mtime.elapsed().as_secs(), - }, - ); + servers.push(ServerStatus { + id: server_id.clone(), + info: server.info.clone(), + jobs: server.jobs.clone(), + u_time: server.u_time.elapsed().as_secs(), + }); } - Ok(SchedulerStatusResult { - num_cpus: servers.values().map(|v| v.num_cpus).sum(), - num_jobs_pending: servers.values().map(|v| v.num_jobs_pending).sum(), - num_jobs_running: servers.values().map(|v| v.num_jobs_running).sum(), - num_servers: servers.len(), + Ok(SchedulerStatus { + info: Some( + servers + .iter() + .fold(dist::ServerStats::default(), |mut info, server| { + info.cpu_usage += server.info.cpu_usage; + info.mem_avail += server.info.mem_avail; + info.mem_total += server.info.mem_total; + info.num_cpus += server.info.num_cpus; + info.occupancy += server.info.occupancy; + info.pre_fetch += server.info.pre_fetch; + info + }), + ) + .map(|mut info| { + info.cpu_usage /= servers.len() as f32; + info + }) + .unwrap(), + jobs: dist::JobStats { + fetched: servers.iter().map(|server| server.jobs.fetched).sum(), + running: servers.iter().map(|server| server.jobs.running).sum(), + }, servers, }) } @@ -707,7 +724,7 @@ impl SchedulerService for Scheduler { } let (tx, rx) = tokio::sync::oneshot::channel::(); - self.pending_jobs.lock().await.insert(job_id.clone(), tx); + self.jobs.lock().await.insert(job_id.clone(), tx); let res = self .task_queue @@ -726,7 +743,7 @@ impl SchedulerService for Scheduler { .map_err(anyhow::Error::new); if let Err(err) = res { - self.pending_jobs.lock().await.remove(&job_id); + self.jobs.lock().await.remove(&job_id); Err(err) } else { rx.await.map_err(anyhow::Error::new) @@ -734,7 +751,7 @@ impl SchedulerService for Scheduler { } async fn job_failure(&self, job_id: &str, reason: &str, info: ServerDetails) -> Result<()> { - let send_res = if let Some(sndr) = self.pending_jobs.lock().await.remove(job_id) { + let send_res = if let Some(sndr) = self.jobs.lock().await.remove(job_id) { sndr.send(RunJobResponse::JobFailed { reason: reason.to_owned(), }) @@ -748,7 +765,7 @@ impl SchedulerService for Scheduler { } async fn job_success(&self, job_id: &str, info: ServerDetails) -> Result<()> { - let send_res = if let Some(sndr) = self.pending_jobs.lock().await.remove(job_id) { + let send_res = if let Some(sndr) = self.jobs.lock().await.remove(job_id) { get_job_result(self.jobs_storage.as_ref(), job_id) .await .context(format!("Failed to retrieve result for job {job_id}")) @@ -765,35 +782,31 @@ impl SchedulerService for Scheduler { send_res.and(self.receive_status(info, Some(success)).await) } - async fn receive_status(&self, info: ServerDetails, job_status: Option) -> Result<()> { + async fn receive_status(&self, details: ServerDetails, job_status: Option) -> Result<()> { if let Some(success) = job_status { if success { - tracing::trace!("Received server success: {info:?}"); + tracing::trace!("Received server success: {details:?}"); } else { - tracing::trace!("Received server failure: {info:?}"); + tracing::trace!("Received server failure: {details:?}"); } } else { - tracing::trace!("Received server status: {info:?}"); + tracing::trace!("Received server status: {details:?}"); } let mut servers = self.servers.lock().await; // Insert or update the server info servers - .entry(info.server_id.clone()) + .entry(details.id.clone()) .and_modify(|server| { - server.max_per_core_load = info.max_per_core_load; - server.mtime = Instant::now(); - server.num_cpus = info.num_cpus; - server.num_jobs_pending = info.num_jobs_pending; - server.num_jobs_running = info.num_jobs_running; + server.info = details.info.clone(); + server.jobs = details.jobs.clone(); + server.u_time = Instant::now(); }) - .or_insert_with(|| ServerStatus { - max_per_core_load: info.max_per_core_load, - mtime: Instant::now(), - num_cpus: info.num_cpus, - num_jobs_pending: info.num_jobs_pending, - num_jobs_running: info.num_jobs_running, + .or_insert_with(|| ServerInfo { + info: details.info.clone(), + jobs: details.jobs.clone(), + u_time: Instant::now(), }); Self::prune_servers(&mut servers); @@ -808,11 +821,11 @@ pub struct Server { jobs_storage: Arc, jobs: Arc>>, job_queue: Arc, - max_per_core_load: f64, - num_cpus: usize, report_interval: Duration, schedulers: Arc>>, server_id: String, + stats: dist::ServerStats, + sys: Arc>, task_queue: Arc, toolchains: ServerToolchains, } @@ -821,27 +834,31 @@ impl Server { #[allow(clippy::too_many_arguments)] pub fn new( server_id: String, - max_per_core_load: f64, - num_cpus: usize, + stats: dist::ServerStats, job_queue: Arc, builder: Arc, task_queue: Arc, jobs_storage: Arc, toolchains: ServerToolchains, ) -> Self { + use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System}; Self { builder, jobs_storage, jobs: Default::default(), job_queue, - max_per_core_load, - num_cpus, // Report status at least every 30s report_interval: Duration::from_secs(30), schedulers: Default::default(), server_id, + stats, task_queue, toolchains, + sys: Arc::new(Mutex::new(System::new_with_specifics( + RefreshKind::nothing() + .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) + .with_memory(MemoryRefreshKind::nothing().with_ram()), + ))), } } @@ -854,25 +871,41 @@ impl Server { .or_insert_with(Instant::now); } + fn get_details( + &self, + jobs: &HashMap, + sys: &mut sysinfo::System, + ) -> ServerDetails { + let running = self.stats.occupancy - self.job_queue.available_permits(); + let fetched = jobs.len() - running; + + sys.refresh_cpu_specifics(sysinfo::CpuRefreshKind::nothing().with_cpu_usage()); + sys.refresh_memory_specifics(sysinfo::MemoryRefreshKind::nothing().with_ram()); + + ServerDetails { + id: self.server_id.clone(), + info: dist::ServerStats { + cpu_usage: sys.global_cpu_usage(), + mem_avail: sys.available_memory(), + mem_total: sys.total_memory(), + num_cpus: self.stats.num_cpus, + occupancy: self.stats.occupancy, + pre_fetch: self.stats.pre_fetch, + }, + jobs: dist::JobStats { fetched, running }, + } + } + async fn send_status( &self, respond_to: impl Into>, - num_jobs_pending: usize, - num_jobs_running: usize, + details: ServerDetails, ) -> Result<()> { let respond_to = respond_to.into(); - let info = ServerDetails { - max_per_core_load: self.max_per_core_load, - num_cpus: self.num_cpus, - num_jobs_pending, - num_jobs_running, - server_id: self.server_id.clone(), - }; + tracing::trace!("Reporting server status: {details:?}"); - tracing::trace!("Reporting server status: {info:?}"); - - let task = server_to_schedulers::status::new(info); + let task = server_to_schedulers::status::new(details); let task = task.with_expires_in(60); let task = if let Some(ref respond_to) = respond_to { task.with_queue(respond_to) @@ -894,11 +927,7 @@ impl Server { Ok(()) } - async fn broadcast_status( - &self, - num_jobs_pending: usize, - num_jobs_running: usize, - ) -> Vec> { + async fn broadcast_status(&self, details: ServerDetails) -> Vec> { let ids = { let report_interval = self.report_interval; let mut schedulers = self.schedulers.lock().await; @@ -925,7 +954,7 @@ impl Server { // 1. https://docs.celeryq.dev/en/stable/userguide/routing.html#broadcast let requests = ids.into_iter().fold(vec![], |mut futs, id| { - futs.push(self.send_status(Some(id), num_jobs_pending, num_jobs_running)); + futs.push(self.send_status(Some(id), details.clone())); futs }); @@ -934,7 +963,7 @@ impl Server { requests } else { // If no schedulers to update, send status to any scheduler - vec![self.send_status(None, num_jobs_pending, num_jobs_running)] + vec![self.send_status(None, details.clone())] } }) .await @@ -956,12 +985,13 @@ impl ServerService for Server { let this = Arc::new(self.clone()); async move { loop { - let num_all_jobs = this.jobs.lock().await.len(); - let num_free_cpus = this.job_queue.available_permits(); - let num_jobs_running = this.num_cpus - num_free_cpus; - let num_jobs_pending = num_all_jobs - num_jobs_running; + let details = { + let jobs = this.jobs.lock().await; + let mut sys = this.sys.lock().await; + this.get_details(&jobs, &mut sys) + }; tokio::time::sleep( - this.broadcast_status(num_jobs_pending, num_jobs_running) + this.broadcast_status(details) .await .iter() .any(|res| res.is_err()) @@ -985,19 +1015,18 @@ impl ServerService for Server { command: CompileCommand, outputs: Vec, ) -> Result<()> { - let num_all_jobs = { + let details = { let mut jobs = self.jobs.lock().await; + let mut sys = self.sys.lock().await; // Associate the task with the scheduler and job so we can report success or failure jobs.insert( task_id.to_owned(), (respond_to.to_owned(), job_id.to_owned()), ); - jobs.len() + self.get_details(&jobs, &mut sys) }; let jobs_storage = self.jobs_storage.as_ref(); - let num_jobs_running = self.num_cpus - self.job_queue.available_permits(); - let num_jobs_pending = num_all_jobs - num_jobs_running; // Report status and load toolchain + inputs in parallel let (inputs, toolchain_dir, _) = futures::future::try_join3( @@ -1009,11 +1038,7 @@ impl ServerService for Server { err }), // Report status - async { - Ok(self - .broadcast_status(num_jobs_pending, num_jobs_running) - .await) - }, + async { Ok(self.broadcast_status(details).await) }, ) .await?; @@ -1043,24 +1068,17 @@ impl ServerService for Server { async fn job_failure(&self, task_id: &str, reason: &str) -> Result<()> { let mut jobs = self.jobs.lock().await; if let Some((respond_to, job_id)) = jobs.remove(task_id) { - let num_all_jobs = jobs.len(); - let num_free_cpus = self.job_queue.available_permits(); - let num_jobs_running = self.num_cpus - num_free_cpus; - let num_jobs_pending = num_all_jobs - num_jobs_running; - - let info = ServerDetails { - max_per_core_load: self.max_per_core_load, - num_cpus: self.num_cpus, - num_jobs_pending, - num_jobs_running, - server_id: self.server_id.clone(), + // Get ServerDetails + let details = { + let mut sys = self.sys.lock().await; + self.get_details(&jobs, &mut sys) }; - + // Drop lock drop(jobs); self.task_queue .send_task( - server_to_schedulers::job_failure::new(job_id, reason.to_owned(), info) + server_to_schedulers::job_failure::new(job_id, reason.to_owned(), details) .with_queue(&respond_to) .with_expires_in(60), ) @@ -1081,24 +1099,17 @@ impl ServerService for Server { async fn job_success(&self, task_id: &str) -> Result<()> { let mut jobs = self.jobs.lock().await; if let Some((respond_to, job_id)) = jobs.remove(task_id) { - let num_all_jobs = jobs.len(); - let num_free_cpus = self.job_queue.available_permits(); - let num_jobs_running = self.num_cpus - num_free_cpus; - let num_jobs_pending = num_all_jobs - num_jobs_running; - - let info = ServerDetails { - max_per_core_load: self.max_per_core_load, - num_cpus: self.num_cpus, - num_jobs_pending, - num_jobs_running, - server_id: self.server_id.clone(), + // Get ServerDetails + let details = { + let mut sys = self.sys.lock().await; + self.get_details(&jobs, &mut sys) }; - + // Drop lock drop(jobs); self.task_queue .send_task( - server_to_schedulers::job_success::new(job_id, info) + server_to_schedulers::job_success::new(job_id, details) .with_queue(&respond_to) .with_expires_in(60), ) @@ -1145,7 +1156,7 @@ mod scheduler_to_servers { let task_id = task.request.id.clone(); tracing::debug!( - "[run_build({task_id}, {job_id}, {respond_to}, {}, {:?}, {:?}, {outputs:?})]", + "[run_job({task_id}, {job_id}, {respond_to}, {}, {:?}, {:?}, {outputs:?})]", toolchain.archive_id, command.executable, command.arguments, @@ -1162,7 +1173,7 @@ mod scheduler_to_servers { .await .map_err(|e| { let msg = format!("run_job failed with error: {e:?}"); - tracing::error!("[run_build({job_id})]: {msg}"); + tracing::error!("[run_job({job_id})]: {msg}"); TaskError::UnexpectedError(msg) }) } @@ -1171,10 +1182,10 @@ mod scheduler_to_servers { let task_id = task.request().id.clone(); let reason = match err { TaskError::TimeoutError => { - format!("[run_build({task_id})]: run_build timed out") + format!("[run_job({task_id})]: run_job timed out") } _ => { - format!("[run_build({task_id})]: {err}") + format!("[run_job({task_id})]: {err}") } }; if let Err(err) = super::SERVER @@ -1210,8 +1221,12 @@ mod server_to_schedulers { use sccache::dist::ServerDetails; + // Limit retries. These are all tasks sent to a specific scheduler. + // If that scheduler goes offline, we want the broker to expire its + // messages instead of keeping them in the queue indefinitely. + // Runs on scheduler to handle heartbeats from servers - #[celery::task] + #[celery::task(max_retries = 10)] pub async fn status(info: ServerDetails) -> TaskResult<()> { super::SCHEDULER .get() @@ -1221,8 +1236,7 @@ mod server_to_schedulers { .map_err(|e| TaskError::UnexpectedError(e.to_string())) } - // Runs on the scheduler - #[celery::task] + #[celery::task(max_retries = 10)] pub async fn job_failure( job_id: String, reason: String, @@ -1236,8 +1250,7 @@ mod server_to_schedulers { .map_err(|e| TaskError::UnexpectedError(e.to_string())) } - // Runs on the scheduler - #[celery::task] + #[celery::task(max_retries = 10)] pub async fn job_success(job_id: String, info: ServerDetails) -> TaskResult<()> { super::SCHEDULER .get() diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index 5474239b9..b8fd8d88d 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -3061,7 +3061,7 @@ LLVM version: 6.0", mod test_dist { use crate::dist::{ self, CompileCommand, NewJobResponse, OutputData, PathTransformer, ProcessOutput, - RunJobResponse, SchedulerStatusResult, SubmitToolchainResult, Toolchain, + RunJobResponse, SchedulerStatus, SubmitToolchainResult, Toolchain, }; use crate::dist::{pkg, BuildResult}; use async_trait::async_trait; @@ -3087,7 +3087,7 @@ mod test_dist { ) -> Result<(NewJobResponse, PathTransformer)> { unreachable!() } - async fn do_get_status(&self) -> Result { + async fn do_get_status(&self) -> Result { unreachable!() } async fn do_submit_toolchain(&self, _: Toolchain) -> Result { @@ -3142,7 +3142,7 @@ mod test_dist { assert_eq!(self.tc, tc); Err(anyhow!("MOCK: alloc job failure")) } - async fn do_get_status(&self) -> Result { + async fn do_get_status(&self) -> Result { unreachable!() } async fn do_submit_toolchain(&self, _: Toolchain) -> Result { @@ -3214,7 +3214,7 @@ mod test_dist { path_transformer, )) } - async fn do_get_status(&self) -> Result { + async fn do_get_status(&self) -> Result { unreachable!("fn do_get_status is not used for this test. qed") } async fn do_submit_toolchain(&self, tc: Toolchain) -> Result { @@ -3287,7 +3287,7 @@ mod test_dist { path_transformer, )) } - async fn do_get_status(&self) -> Result { + async fn do_get_status(&self) -> Result { unreachable!() } async fn do_submit_toolchain(&self, tc: Toolchain) -> Result { @@ -3373,7 +3373,7 @@ mod test_dist { path_transformer, )) } - async fn do_get_status(&self) -> Result { + async fn do_get_status(&self) -> Result { unreachable!("fn do_get_status is not used for this test. qed") } async fn do_submit_toolchain(&self, tc: Toolchain) -> Result { diff --git a/src/dist/http.rs b/src/dist/http.rs index 894451d18..b65d3cfb3 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -263,7 +263,7 @@ mod client { use crate::dist::pkg::{InputsPackager, ToolchainPackager}; use crate::dist::{ self, CompileCommand, NewJobRequest, NewJobResponse, PathTransformer, RunJobRequest, - RunJobResponse, SchedulerStatusResult, SubmitToolchainResult, Toolchain, + RunJobResponse, SchedulerStatus, SubmitToolchainResult, Toolchain, }; use crate::util::new_reqwest_client; @@ -412,7 +412,7 @@ mod client { bincode_req_fut(req).await } - async fn do_get_status(&self) -> Result { + async fn do_get_status(&self) -> Result { let req = self .client .lock() diff --git a/src/dist/mod.rs b/src/dist/mod.rs index e9fb58d60..2cc6c9853 100644 --- a/src/dist/mod.rs +++ b/src/dist/mod.rs @@ -510,34 +510,55 @@ pub struct Toolchain { // Status -#[derive(Clone, Debug, Serialize, Deserialize)] +// Unfortunately bincode doesn't support #[serde(flatten)] :( +// https://github.com/bincode-org/bincode/issues/245 + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct SchedulerStatusResult { - pub num_cpus: usize, - pub num_jobs_pending: usize, - pub num_jobs_running: usize, - pub num_servers: usize, - pub servers: std::collections::HashMap, +pub struct SchedulerStatus { + // #[serde(flatten)] + pub info: ServerStats, + pub jobs: JobStats, + pub servers: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct ServerStatusResult { - pub last_seen: u64, - pub max_per_core_load: f64, - pub num_cpus: usize, - pub num_jobs_pending: usize, - pub num_jobs_running: usize, +pub struct ServerStatus { + // #[serde(flatten)] + // pub details: ServerDetails, + pub id: String, + // #[serde(flatten)] + pub info: ServerStats, + pub jobs: JobStats, + pub u_time: u64, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct ServerDetails { - pub max_per_core_load: f64, + pub id: String, + // #[serde(flatten)] + pub info: ServerStats, + pub jobs: JobStats, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ServerStats { + pub cpu_usage: f32, + pub mem_avail: u64, + pub mem_total: u64, pub num_cpus: usize, - pub num_jobs_pending: usize, - pub num_jobs_running: usize, - pub server_id: String, + pub occupancy: usize, + pub pre_fetch: usize, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct JobStats { + pub fetched: usize, + pub running: usize, } // SubmitToolchain @@ -556,7 +577,7 @@ pub enum SubmitToolchainResult { #[cfg(feature = "dist-server")] #[async_trait] pub trait SchedulerService: Send + Sync { - async fn get_status(&self) -> Result; + async fn get_status(&self) -> Result; async fn has_toolchain(&self, toolchain: Toolchain) -> bool; @@ -628,7 +649,7 @@ pub trait Client: Send + Sync { outputs: Vec, ) -> Result; // To Scheduler - async fn do_get_status(&self) -> Result; + async fn do_get_status(&self) -> Result; // To Scheduler async fn do_submit_toolchain(&self, tc: Toolchain) -> Result; async fn put_toolchain( diff --git a/src/dist/server.rs b/src/dist/server.rs index 759a4060d..e50e6871b 100644 --- a/src/dist/server.rs +++ b/src/dist/server.rs @@ -74,11 +74,14 @@ mod internal { // application/octet-stream Some(0) => match bincode::serialize(&content) { Ok(body) => Ok((StatusCode::OK, body).into_response()), - Err(err) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to serialize response body: {err}").into_bytes(), - ) - .into_response()), + Err(err) => { + tracing::error!("Failed to serialize response body: {err:?}"); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to serialize response body: {err}").into_bytes(), + ) + .into_response()) + } }, // application/json Some(1) => Ok(( @@ -86,15 +89,21 @@ mod internal { json!(content).as_str().unwrap().to_string().into_bytes(), ) .into_response()), - _ => Err(( - StatusCode::BAD_REQUEST, - "Request must accept application/json or application/octet-stream" - .to_string() - .into_bytes(), - ) - .into_response()), + _ => { + tracing::error!( + "Request must accept application/json or application/octet-stream" + ); + Err(( + StatusCode::BAD_REQUEST, + "Request must accept application/json or application/octet-stream" + .to_string() + .into_bytes(), + ) + .into_response()) + } } } else { + tracing::error!("Request must accept application/json or application/octet-stream"); Err(( StatusCode::BAD_REQUEST, "Request must accept application/json or application/octet-stream" @@ -111,10 +120,7 @@ mod internal { uri: Uri, ) -> impl FnOnce(anyhow::Error) -> std::result::Result { move |err: anyhow::Error| { - tracing::error!( - "{}", - format!("sccache: `{method} {uri}` failed with: {err:?}") - ); + tracing::error!("sccache: `{method} {uri}` failed with: {err:?}"); let msg = format!("sccache: `{method} {uri}` failed with: `{err}`"); Err((StatusCode::INTERNAL_SERVER_ERROR, msg.into_bytes()).into_response()) } diff --git a/src/server.rs b/src/server.rs index b099f1086..a2f2ec81c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -381,7 +381,8 @@ impl DistClientContainer { Ok(res) => { info!( "Successfully created dist client with {:?} cores across {:?} servers", - res.num_cpus, res.num_servers + res.info.num_cpus, + res.servers.len() ); DistClientState::Some(Box::new(config), Arc::new(dist_client)) } @@ -1645,7 +1646,7 @@ pub enum DistInfo { #[cfg(feature = "dist-client")] NotConnected(Option, String), #[cfg(feature = "dist-client")] - SchedulerStatus(Option, dist::SchedulerStatusResult), + SchedulerStatus(Option, dist::SchedulerStatus), } impl Default for ServerStats { diff --git a/tests/harness/mod.rs b/tests/harness/mod.rs index a0dd765c9..e33fab362 100644 --- a/tests/harness/mod.rs +++ b/tests/harness/mod.rs @@ -1,7 +1,7 @@ use fs_err as fs; #[cfg(any(feature = "dist-client", feature = "dist-server"))] use sccache::config::HTTPUrl; -use sccache::dist::{self, SchedulerStatusResult}; +use sccache::dist::{self, SchedulerStatus}; use sccache::server::ServerInfo; use std::env; use std::io::Write; @@ -372,11 +372,12 @@ impl DistSystem { let status = self.scheduler_status(); if matches!( status, - SchedulerStatusResult { - num_cpus: _, - num_jobs_pending: 0, - num_jobs_running: 0, - num_servers: 0, + SchedulerStatus { + info: _, + jobs: dist::JobStats { + fetched: 0, + running: 0, + }, servers: _ } ) { @@ -564,7 +565,7 @@ impl DistSystem { HTTPUrl::from_url(reqwest::Url::parse(&url).unwrap()) } - fn scheduler_status(&self) -> SchedulerStatusResult { + fn scheduler_status(&self) -> SchedulerStatus { let mut req = reqwest::blocking::Client::builder().build().unwrap().get( dist::http::urls::scheduler_status(&self.scheduler_url().to_url()), );