From 1cd08ecb796f6095dea86f23a11005b5f049c33d Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Mon, 4 Apr 2022 09:18:02 -0700 Subject: [PATCH] Implement drop trait (#1) * implement drop trait Signed-off-by: Dan Bond * tidy req receives Signed-off-by: Dan Bond * add atomic stop bool Signed-off-by: Dan Bond * bump version Signed-off-by: Dan Bond * Arc::clone Signed-off-by: Dan Bond * add comments Signed-off-by: Dan Bond * formatting Signed-off-by: Dan Bond * don't block when receiving requests Signed-off-by: Dan Bond * store server Signed-off-by: Dan Bond * remove serve Signed-off-by: Dan Bond * no mut Signed-off-by: Dan Bond * fields -> shared Signed-off-by: Dan Bond * comments Signed-off-by: Dan Bond * update docs Signed-off-by: Dan Bond * swap -> store Signed-off-by: Dan Bond --- Cargo.toml | 2 +- README.md | 15 ++--- examples/prometheus.rs | 7 +-- src/lib.rs | 9 ++- src/server.rs | 136 +++++++++++++++++++++-------------------- tests/server.rs | 6 +- 6 files changed, 88 insertions(+), 87 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7470c1..564b222 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics_server" -version = "0.2.0" +version = "0.3.0" authors = ["Dan Bond "] edition = "2021" rust-version = "1.58" diff --git a/README.md b/README.md index fe88b54..90d1be1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Metrics Server +# metrics_server [![CI](https://github.com/loshz/metrics_server/actions/workflows/ci.yml/badge.svg)](https://github.com/loshz/metrics_server/actions/workflows/ci.yml) [![Version](https://img.shields.io/crates/v/metrics_server.svg)](https://crates.io/crates/metrics_server) [![Docs](https://docs.rs/metrics_server/badge.svg)](https://docs.rs/metrics_server) @@ -6,25 +6,26 @@ A hassle-free, single-responsibility, safe HTTP server used to easily expose metrics in an application. +This crate provides a thread safe, minimalstic HTTP server used to buffer metrics and serve them via a standard `/metrics` endpoint. It's aim is to remove the boilerplate needed to create such simple mechanisms. It is currently somewhat oppinionated and naive in order to maintain little complexity. + ## Usage Include the lib in your `Cargo.toml` dependencies: ```toml [dependencies] -metrics_server = "0.2" +metrics_server = "0.3" ``` In your application: ```rust use metrics_server::MetricsServer; -// Create a new server and start it in the background. -let server = MetricsServer::new(); -server.serve("localhost:8001"); +// Create a new server and start listening for requests in the background. +let server = MetricsServer::new("localhost:8001"); -// Publish you application metrics periodically. +// Publish you application metrics. let bytes = server.update(Vec::from([1, 2, 3, 4])); assert_eq!(bytes, 4); ``` -For more comprehensive usage, check out the included [examples](./examples). +For more comprehensive usage, see the included [examples](./examples). diff --git a/examples/prometheus.rs b/examples/prometheus.rs index f14d7cd..f29b06a 100644 --- a/examples/prometheus.rs +++ b/examples/prometheus.rs @@ -16,8 +16,7 @@ fn main() { registry.register("some_count", "Number of random counts", counter.clone()); // Expose the Prometheus metrics. - let server = MetricsServer::new(); - server.serve("localhost:8001"); + let server = MetricsServer::new("localhost:8001"); // Increment the counter every 5 seconds. loop { @@ -30,7 +29,7 @@ fn main() { // Update the Metrics Server with the current encoded data. server.update(encoded); - let ten_secs = time::Duration::from_secs(5); - thread::sleep(ten_secs); + let five_secs = time::Duration::from_secs(5); + thread::sleep(five_secs); } } diff --git a/src/lib.rs b/src/lib.rs index 77152df..86bb5eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,13 +12,12 @@ //! ``` //! use metrics_server::MetricsServer; //! -//! // Create a new server and start it in the background. -//! let server = MetricsServer::new(); -//! server.serve("localhost:8001"); +//! // Create a new server and start listening for requests in the background. +//! let server = MetricsServer::new("localhost:8001"); //! -//! // Publish you application metrics periodically. +//! // Publish your application metrics. //! let bytes = server.update(Vec::from([1, 2, 3, 4])); -//! assert_eq!(bytes, 4); +//! assert_eq!(4, bytes); //! ``` mod server; diff --git a/src/server.rs b/src/server.rs index 2ceff0e..02598d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,55 +1,24 @@ +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use tiny_http::{Method, Response, Server}; -/// A thread-safe growable array. -#[derive(Clone)] -pub struct MetricsServer(Arc>>); +/// A thread-safe datastore for serving metrics via a HTTP server. +pub struct MetricsServer { + shared: Arc, + thread: Option>, +} -impl Default for MetricsServer { - fn default() -> Self { - Self::new() - } +struct MetricsServerShared { + data: Mutex>, + server: Server, + stop: AtomicBool, } impl MetricsServer { /// Creates a new empty `MetricsServer`. /// - /// This will create a mutex protected empty Vector. It will not allocate. - /// - /// # Examples - /// - /// ``` - /// use metrics_server::MetricsServer; - /// - /// let server = MetricsServer::new(); - /// ``` - pub fn new() -> Self { - MetricsServer(Arc::new(Mutex::new(Vec::new()))) - } - - /// Safely updates the data in a `MetricsServer` and returns the number of - /// bytes written. - /// - /// This function is thread safe and protected by a mutex. It is safe - /// to call concurrently from multiple threads. - /// - /// # Examples - /// - /// ``` - /// use metrics_server::MetricsServer; - /// - /// let server = MetricsServer::new(); - /// let bytes = server.update(Vec::from([1, 2, 3, 4])); - /// assert_eq!(bytes, 4); - /// ``` - pub fn update(&self, data: Vec) -> usize { - let mut buf = self.0.lock().unwrap(); - *buf = data; - buf.as_slice().len() - } - /// Starts a simple HTTP server on a new thread at the given address and expose the stored metrics. /// This server is intended to only be queried synchronously as it blocks upon receiving /// each request. @@ -59,60 +28,95 @@ impl MetricsServer { /// ``` /// use metrics_server::MetricsServer; /// - /// let server = MetricsServer::new(); - /// server.serve("localhost:8001"); + /// let server = MetricsServer::new("localhost:8001"); /// ``` /// /// # Panics /// /// Panics if given an invalid address. - pub fn serve(&self, addr: &str) { - // Create a new HTTP server and bind to the given address. - let server = Server::http(addr).unwrap(); + pub fn new(addr: &str) -> Self { + let shared = Arc::new(MetricsServerShared { + data: Mutex::new(Vec::new()), + server: Server::http(addr).unwrap(), + stop: AtomicBool::new(false), + }); // Invoking clone on Arc produces a new Arc instance, which points to the // same allocation on the heap as the source Arc, while increasing a reference count. - let buf = Arc::clone(&self.0); + let s = Arc::clone(&shared); // Handle requests in a new thread so we can process in the background. - thread::spawn({ + let thread = Some(thread::spawn({ move || { - loop { - // Blocks until the next request is received. - let req = match server.recv() { - Ok(req) => req, - Err(e) => { - eprintln!("error: {}", e); - continue; - } - }; + // Blocks until the next request is received. + for req in s.server.incoming_requests() { + // Check to see if we should stop handling requests. + if s.stop.load(Ordering::Relaxed) { + break; + } // Only respond to GET requests. if req.method() != &Method::Get { let res = Response::empty(405); - if let Err(e) = req.respond(res) { - eprintln!("{}", e); - }; + let _ = req.respond(res); continue; } // Only serve the /metrics path. if req.url() != "/metrics" { let res = Response::empty(404); - if let Err(e) = req.respond(res) { - eprintln!("{}", e); - }; + let _ = req.respond(res); continue; } // Write the metrics to the response buffer. - let metrics = buf.lock().unwrap(); + let metrics = s.data.lock().unwrap(); let res = Response::from_data(metrics.as_slice()); if let Err(e) = req.respond(res) { - eprintln!("{}", e); + eprintln!("metrics_server error: {}", e); }; } } - }); + })); + + MetricsServer { shared, thread } + } + + /// Safely updates the data in a `MetricsServer` and returns the number of + /// bytes written. + /// + /// This method is protected by a mutex making it safe + /// to call concurrently from multiple threads. + /// + /// # Examples + /// + /// ``` + /// use metrics_server::MetricsServer; + /// + /// let server = MetricsServer::new("localhost:8001"); + /// let bytes = server.update(Vec::from([1, 2, 3, 4])); + /// assert_eq!(4, bytes); + /// ``` + pub fn update(&self, data: Vec) -> usize { + let mut buf = self.shared.data.lock().unwrap(); + *buf = data; + buf.as_slice().len() + } +} + +impl Drop for MetricsServer { + // TODO: should I really be doing this inside drop? It _could_ panic, + // so maybe a shutdown method would be better? + fn drop(&mut self) { + // Signal that we should stop handling requests and unblock the server. + self.shared.stop.store(true, Ordering::Relaxed); + self.shared.server.unblock(); + + // Because join takes ownership of the thread, we need call the take method + // on the Option to move the value out of the Some variant and leave a None + // variant in its place. + if let Some(thread) = self.thread.take() { + thread.join().unwrap(); + } } } diff --git a/tests/server.rs b/tests/server.rs index b61f207..d8d589b 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -3,14 +3,12 @@ use metrics_server::MetricsServer; #[test] #[should_panic] fn test_server_invalid_address() { - let server = MetricsServer::new(); - server.serve("invalid:99999999"); + let _ = MetricsServer::new("invalid:99999999"); } #[test] fn test_server_serve() { - let server = MetricsServer::new(); - server.serve("localhost:8001"); + let server = MetricsServer::new("localhost:8001"); for i in 0..3 { // Create mock data and update the metrics server.