Skip to content

Commit

Permalink
Implement drop trait (#1)
Browse files Browse the repository at this point in the history
* implement drop trait

Signed-off-by: Dan Bond <[email protected]>

* tidy req receives

Signed-off-by: Dan Bond <[email protected]>

* add atomic stop bool

Signed-off-by: Dan Bond <[email protected]>

* bump version

Signed-off-by: Dan Bond <[email protected]>

* Arc::clone

Signed-off-by: Dan Bond <[email protected]>

* add comments

Signed-off-by: Dan Bond <[email protected]>

* formatting

Signed-off-by: Dan Bond <[email protected]>

* don't block when receiving requests

Signed-off-by: Dan Bond <[email protected]>

* store server

Signed-off-by: Dan Bond <[email protected]>

* remove serve

Signed-off-by: Dan Bond <[email protected]>

* no mut

Signed-off-by: Dan Bond <[email protected]>

* fields -> shared

Signed-off-by: Dan Bond <[email protected]>

* comments

Signed-off-by: Dan Bond <[email protected]>

* update docs

Signed-off-by: Dan Bond <[email protected]>

* swap -> store

Signed-off-by: Dan Bond <[email protected]>
  • Loading branch information
loshz authored Apr 4, 2022
1 parent 8122c18 commit 1cd08ec
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "metrics_server"
version = "0.2.0"
version = "0.3.0"
authors = ["Dan Bond <[email protected]>"]
edition = "2021"
rust-version = "1.58"
Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
# Metrics Server
# metrics_server
[![CI](https://github.com/loshz/metrics_server/actions/workflows/ci.yml/badge.svg)](https://github.com/loshz/metrics_server/actions/workflows/ci.yml)
[![Version](https://img.shields.io/crates/v/metrics_server.svg)](https://crates.io/crates/metrics_server)
[![Docs](https://docs.rs/metrics_server/badge.svg)](https://docs.rs/metrics_server)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/loshz/metrics_server/blob/main/LICENSE)

A hassle-free, single-responsibility, safe HTTP server used to easily expose metrics in an application.

This crate provides a thread safe, minimalstic HTTP server used to buffer metrics and serve them via a standard `/metrics` endpoint. It's aim is to remove the boilerplate needed to create such simple mechanisms. It is currently somewhat oppinionated and naive in order to maintain little complexity.

## Usage

Include the lib in your `Cargo.toml` dependencies:
```toml
[dependencies]
metrics_server = "0.2"
metrics_server = "0.3"
```

In your application:
```rust
use metrics_server::MetricsServer;

// Create a new server and start it in the background.
let server = MetricsServer::new();
server.serve("localhost:8001");
// Create a new server and start listening for requests in the background.
let server = MetricsServer::new("localhost:8001");

// Publish you application metrics periodically.
// Publish you application metrics.
let bytes = server.update(Vec::from([1, 2, 3, 4]));
assert_eq!(bytes, 4);
```

For more comprehensive usage, check out the included [examples](./examples).
For more comprehensive usage, see the included [examples](./examples).
7 changes: 3 additions & 4 deletions examples/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ fn main() {
registry.register("some_count", "Number of random counts", counter.clone());

// Expose the Prometheus metrics.
let server = MetricsServer::new();
server.serve("localhost:8001");
let server = MetricsServer::new("localhost:8001");

// Increment the counter every 5 seconds.
loop {
Expand All @@ -30,7 +29,7 @@ fn main() {
// Update the Metrics Server with the current encoded data.
server.update(encoded);

let ten_secs = time::Duration::from_secs(5);
thread::sleep(ten_secs);
let five_secs = time::Duration::from_secs(5);
thread::sleep(five_secs);
}
}
9 changes: 4 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
//! ```
//! use metrics_server::MetricsServer;
//!
//! // Create a new server and start it in the background.
//! let server = MetricsServer::new();
//! server.serve("localhost:8001");
//! // Create a new server and start listening for requests in the background.
//! let server = MetricsServer::new("localhost:8001");
//!
//! // Publish you application metrics periodically.
//! // Publish your application metrics.
//! let bytes = server.update(Vec::from([1, 2, 3, 4]));
//! assert_eq!(bytes, 4);
//! assert_eq!(4, bytes);
//! ```
mod server;

Expand Down
136 changes: 70 additions & 66 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,24 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

use tiny_http::{Method, Response, Server};

/// A thread-safe growable array.
#[derive(Clone)]
pub struct MetricsServer(Arc<Mutex<Vec<u8>>>);
/// A thread-safe datastore for serving metrics via a HTTP server.
pub struct MetricsServer {
shared: Arc<MetricsServerShared>,
thread: Option<thread::JoinHandle<()>>,
}

impl Default for MetricsServer {
fn default() -> Self {
Self::new()
}
struct MetricsServerShared {
data: Mutex<Vec<u8>>,
server: Server,
stop: AtomicBool,
}

impl MetricsServer {
/// Creates a new empty `MetricsServer`.
///
/// This will create a mutex protected empty Vector. It will not allocate.
///
/// # Examples
///
/// ```
/// use metrics_server::MetricsServer;
///
/// let server = MetricsServer::new();
/// ```
pub fn new() -> Self {
MetricsServer(Arc::new(Mutex::new(Vec::new())))
}

/// Safely updates the data in a `MetricsServer` and returns the number of
/// bytes written.
///
/// This function is thread safe and protected by a mutex. It is safe
/// to call concurrently from multiple threads.
///
/// # Examples
///
/// ```
/// use metrics_server::MetricsServer;
///
/// let server = MetricsServer::new();
/// let bytes = server.update(Vec::from([1, 2, 3, 4]));
/// assert_eq!(bytes, 4);
/// ```
pub fn update(&self, data: Vec<u8>) -> usize {
let mut buf = self.0.lock().unwrap();
*buf = data;
buf.as_slice().len()
}

/// Starts a simple HTTP server on a new thread at the given address and expose the stored metrics.
/// This server is intended to only be queried synchronously as it blocks upon receiving
/// each request.
Expand All @@ -59,60 +28,95 @@ impl MetricsServer {
/// ```
/// use metrics_server::MetricsServer;
///
/// let server = MetricsServer::new();
/// server.serve("localhost:8001");
/// let server = MetricsServer::new("localhost:8001");
/// ```
///
/// # Panics
///
/// Panics if given an invalid address.
pub fn serve(&self, addr: &str) {
// Create a new HTTP server and bind to the given address.
let server = Server::http(addr).unwrap();
pub fn new(addr: &str) -> Self {
let shared = Arc::new(MetricsServerShared {
data: Mutex::new(Vec::new()),
server: Server::http(addr).unwrap(),
stop: AtomicBool::new(false),
});

// Invoking clone on Arc produces a new Arc instance, which points to the
// same allocation on the heap as the source Arc, while increasing a reference count.
let buf = Arc::clone(&self.0);
let s = Arc::clone(&shared);

// Handle requests in a new thread so we can process in the background.
thread::spawn({
let thread = Some(thread::spawn({
move || {
loop {
// Blocks until the next request is received.
let req = match server.recv() {
Ok(req) => req,
Err(e) => {
eprintln!("error: {}", e);
continue;
}
};
// Blocks until the next request is received.
for req in s.server.incoming_requests() {
// Check to see if we should stop handling requests.
if s.stop.load(Ordering::Relaxed) {
break;
}

// Only respond to GET requests.
if req.method() != &Method::Get {
let res = Response::empty(405);
if let Err(e) = req.respond(res) {
eprintln!("{}", e);
};
let _ = req.respond(res);
continue;
}

// Only serve the /metrics path.
if req.url() != "/metrics" {
let res = Response::empty(404);
if let Err(e) = req.respond(res) {
eprintln!("{}", e);
};
let _ = req.respond(res);
continue;
}

// Write the metrics to the response buffer.
let metrics = buf.lock().unwrap();
let metrics = s.data.lock().unwrap();
let res = Response::from_data(metrics.as_slice());
if let Err(e) = req.respond(res) {
eprintln!("{}", e);
eprintln!("metrics_server error: {}", e);
};
}
}
});
}));

MetricsServer { shared, thread }
}

/// Safely updates the data in a `MetricsServer` and returns the number of
/// bytes written.
///
/// This method is protected by a mutex making it safe
/// to call concurrently from multiple threads.
///
/// # Examples
///
/// ```
/// use metrics_server::MetricsServer;
///
/// let server = MetricsServer::new("localhost:8001");
/// let bytes = server.update(Vec::from([1, 2, 3, 4]));
/// assert_eq!(4, bytes);
/// ```
pub fn update(&self, data: Vec<u8>) -> usize {
let mut buf = self.shared.data.lock().unwrap();
*buf = data;
buf.as_slice().len()
}
}

impl Drop for MetricsServer {
// TODO: should I really be doing this inside drop? It _could_ panic,
// so maybe a shutdown method would be better?
fn drop(&mut self) {
// Signal that we should stop handling requests and unblock the server.
self.shared.stop.store(true, Ordering::Relaxed);
self.shared.server.unblock();

// Because join takes ownership of the thread, we need call the take method
// on the Option to move the value out of the Some variant and leave a None
// variant in its place.
if let Some(thread) = self.thread.take() {
thread.join().unwrap();
}
}
}
6 changes: 2 additions & 4 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ use metrics_server::MetricsServer;
#[test]
#[should_panic]
fn test_server_invalid_address() {
let server = MetricsServer::new();
server.serve("invalid:99999999");
let _ = MetricsServer::new("invalid:99999999");
}

#[test]
fn test_server_serve() {
let server = MetricsServer::new();
server.serve("localhost:8001");
let server = MetricsServer::new("localhost:8001");

for i in 0..3 {
// Create mock data and update the metrics server.
Expand Down

0 comments on commit 1cd08ec

Please sign in to comment.