Skip to content

Commit

Permalink
publish req logs in an Apache-like format
Browse files Browse the repository at this point in the history
Signed-off-by: Dan Bond <[email protected]>
  • Loading branch information
loshz committed May 12, 2024
1 parent d274b0d commit 493030c
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- main
tags:
tags:
- '*'
pull_request:
branches:
Expand All @@ -22,7 +22,7 @@ jobs:
# Run linters.
- run: |
cargo fmt -- --check
cargo clippy --tests -- --no-deps -D warnings
cargo clippy --examples --tests -- --no-deps -D warnings
# Run all tests.
- run: cargo test --no-fail-fast --all-features
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "metrics_server"
version = "0.13.0"
version = "0.14.0"
authors = ["Dan Bond <[email protected]>"]
edition = "2021"
rust-version = "1.58"
rust-version = "1.63"
description = "A hassle-free, single-responsibility, safe HTTP/S server used to easily expose metrics in an application."
documentation = "https://docs.rs/metrics_server"
readme = "README.md"
Expand All @@ -18,14 +18,16 @@ include = ["src/**/*", "tests", "examples", "Cargo.toml", "LICENSE", "README.md"
doctest = false

[dependencies]
http = "1.1"
log = "0.4"
tiny_http = "0.12"
http = "0.2"
time = { version = "0.3", features = ["formatting"] }

[dev-dependencies]
ctrlc = { version = "3.4", features = ["termination"] }
prometheus-client = "0.21"
reqwest = { version = "0.11", features = ["blocking"] }
env_logger = "0.11"
prometheus-client = "0.22"
reqwest = { version = "0.12", features = ["blocking"] }

[features]
default = []
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ This crate provides a thread safe, minimalstic HTTP/S server used to buffer metr
Include the lib in your `Cargo.toml` dependencies:
```toml
[dependencies]
metrics_server = "0.13"
metrics_server = "0.14"
```

To enable TLS support, pass the optional feature flag:
```toml
[dependencies]
metrics_server = { version = "0.13", features = ["tls"] }
metrics_server = { version = "0.14", features = ["tls"] }
```

### HTTP
Expand Down
40 changes: 20 additions & 20 deletions examples/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,57 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use log::info;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::registry::Registry;

use metrics_server::MetricsServer;

fn main() {
env_logger::init();

// Register stop handler.
let stop = Arc::new(AtomicBool::new(false));
let s = stop.clone();
let (send, recv) = mpsc::channel();
ctrlc::set_handler(move || {
println!("Stopping...");
s.store(true, Ordering::Relaxed);
info!("Stopping metrics server");
send.send(()).unwrap();
})
.unwrap();

// Expose the Prometheus metrics.
let server = MetricsServer::http("localhost:8001");
println!("Starting metrics server: http://localhost:8001/metrics");
info!("Starting metrics server: http://localhost:8001/metrics");

std::thread::scope(|s| {
let handle = s.spawn(|| {
thread::scope(|s| {
let handle = s.spawn(move || {
// Create a metrics registry and counter that represents a single monotonically
// increasing counter.
let mut registry = Registry::default();
let counter: Counter = Counter::default();
registry.register("some_count", "Number of random counts", counter.clone());

// Increment the counter every 5 seconds.
// Increment the counter periodically.
loop {
if stop.load(Ordering::Relaxed) {
break;
}

counter.inc();

// Encode the current Registry in Prometheus format.
// Encode the current Registry in Prometheus format
let mut encoded = String::new();
encode(&mut encoded, &registry).unwrap();

// Update the Metrics Server with the current encoded data.
server.update(encoded.into());

thread::sleep(time::Duration::from_secs(5));
// Sleep for 5 seconds or exit.
if recv.recv_timeout(Duration::from_secs(5)).is_ok() {
// Stop server.
server.stop().unwrap();
break;
}
}
});

handle.join().unwrap();
});

// Stop server.
server.stop().unwrap();
}
60 changes: 35 additions & 25 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::thread;

use http::Uri;
use log::{debug, error};
use time::{format_description, OffsetDateTime};
use tiny_http::{ConfigListenAddr, Method, Response, Server};

use crate::error::ServerError;
Expand All @@ -15,11 +16,11 @@ pub const DEFAULT_METRICS_PATH: &str = "/metrics";

/// A thread-safe datastore for serving metrics via a HTTP/S server.
pub struct MetricsServer {
shared: Arc<MetricsServerShared>,
shared: Arc<SharedData>,
thread: Option<thread::JoinHandle<()>>,
}

struct MetricsServerShared {
struct SharedData {
data: Mutex<Vec<u8>>,
server: Server,
stop: AtomicBool,
Expand Down Expand Up @@ -60,7 +61,7 @@ impl MetricsServer {
let server = Server::new(config).map_err(|e| ServerError::Create(e.to_string()))?;

// Create an Arc of the shared data.
let shared = Arc::new(MetricsServerShared {
let shared = Arc::new(SharedData {
data: Mutex::new(Vec::new()),
server,
stop: AtomicBool::new(false),
Expand Down Expand Up @@ -107,13 +108,11 @@ impl MetricsServer {
server
}

/// Safely updates the data in a `MetricsServer` and returns the number of bytes written.
///
/// This method is protected by a mutex making it safe to call concurrently from multiple threads.
/// Thread safe method for updating the data in a `MetricsServer`, returning the number of bytes written.
pub fn update(&self, data: Vec<u8>) -> usize {
let mut buf = self.shared.data.lock().unwrap();
*buf = data;
buf.as_slice().len()
buf.len()
}

/// Start serving requests to the /metrics URL path on the underlying server.
Expand Down Expand Up @@ -153,37 +152,24 @@ impl MetricsServer {
break;
}

debug!(
"metrics_server: request received [url: '{}', remote_addr: '{}', http_version: '{}']",
req.url(),
req.remote_addr().map_or("N/A".to_string(), |v| v.to_string()),
req.http_version(),
);

// Only serve the specified uri path.
if req.url().to_lowercase() != u {
let res = Response::empty(404);
if let Err(e) = req.respond(res) {
error!("metrics_server error: {}", e);
};
respond(req, res);
continue;
}

// Only respond to GET requests.
if req.method() != &Method::Get {
let res = Response::empty(405);
if let Err(e) = req.respond(res) {
error!("metrics_server error: {}", e);
};
respond(req, res);
continue;
}

// Write the metrics to the response buffer.
let metrics = s.data.lock().unwrap();
let res = Response::from_data(metrics.as_slice());
if let Err(e) = req.respond(res) {
error!("metrics_server error: {}", e);
};
respond(req, res);
}
}
}));
Expand Down Expand Up @@ -212,10 +198,10 @@ impl MetricsServer {
}
}

/// Validate the provided URI or return the default /metrics on error.
// Validate the provided URI or return the default /metrics on error.
fn parse_uri(mut uri: String) -> String {
if !uri.starts_with('/') {
uri = format!("/{}", uri);
uri.insert(0, '/');
}

let u = match Uri::from_str(&uri) {
Expand All @@ -229,6 +215,30 @@ fn parse_uri(mut uri: String) -> String {
u.to_lowercase()
}

// Responds to a given request and logs in an Apache-like format.
fn respond<D>(req: tiny_http::Request, res: tiny_http::Response<D>)
where
D: std::io::Read,
{
let datetime = OffsetDateTime::now_utc()
.format(&format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "-".to_string());

debug!(
"{} [{}] \"{} {} HTTP/{}\" {}",
req.remote_addr().map_or("-".to_string(), |v| v.to_string()),
datetime,
req.method(),
req.url(),
req.http_version(),
res.status_code().0,
);

if let Err(e) = req.respond(res) {
error!("error sending metrics response: {e}");
};
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 493030c

Please sign in to comment.