Skip to content

Commit

Permalink
app: Implement a separate health check server
Browse files Browse the repository at this point in the history
A separate server provides identically behaving /live and /ready routes
to the admin server. Does not remove the existing admin server's routes.

Background:

On some Kubernetes distributions, requests from the control plane may
not come from a private address range IP address or even a consistent IP
address. This poses a problem, because the admin server used in a
multicluster mesh needs to simultaneously serve /live and /ready routes
to:

* The Kubernetes control plane, for liveness and readiness probes
  respectively
* Remote clusters as part of probing for remote gateway

In order to avoid exposing the other admin routes, the multicluster
gateway uses an authorization policy forbidding unauthorized and
out-of-cluster requests. This causes the gateway to fail readiness and
liveness probes.

Resolution:

Implement a separate server in the proxy app that can securely serve
/live and /ready routes. The port that server listens on can be used for
health check probes internally, without an authorization policy.

See: linkerd/linkerd2#7548

Signed-off-by: Aaron Friel <[email protected]>
  • Loading branch information
AaronFriel committed Jan 3, 2022
1 parent 56a4511 commit ddc0b9d
Show file tree
Hide file tree
Showing 13 changed files with 578 additions and 10 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ dependencies = [
"linkerd-app-admin",
"linkerd-app-core",
"linkerd-app-gateway",
"linkerd-app-health",
"linkerd-app-inbound",
"linkerd-app-outbound",
"linkerd-error",
Expand Down Expand Up @@ -855,6 +856,21 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-app-health"
version = "0.1.0"
dependencies = [
"futures",
"http",
"hyper",
"linkerd-app-core",
"linkerd-app-inbound",
"thiserror",
"tokio",
"tower",
"tracing",
]

[[package]]
name = "linkerd-app-inbound"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ allow-loopback = ["linkerd-app-outbound/allow-loopback"]
[dependencies]
futures = { version = "0.3", default-features = false }
linkerd-app-admin = { path = "./admin" }
linkerd-app-health = { path = "./health" }
linkerd-app-core = { path = "./core" }
linkerd-app-gateway = { path = "./gateway" }
linkerd-app-inbound = { path = "./inbound" }
Expand Down
31 changes: 31 additions & 0 deletions linkerd/app/health/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "linkerd-app-health"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
The linkerd proxy's health check server.
"""

[dependencies]
http = "0.2"
hyper = { version = "0.14", features = ["http1", "http2"] }
futures = { version = "0.3", default-features = false }
linkerd-app-core = { path = "../core" }
linkerd-app-inbound = { path = "../inbound" }
thiserror = "1"
tokio = { version = "1", features = ["macros", "sync", "parking_lot"]}
tracing = "0.1"

[dependencies.tower]
version = "0.4"
default-features = false
features = [
"buffer",
"make",
"spawn-ready",
"timeout",
"util",
]
8 changes: 8 additions & 0 deletions linkerd/app/health/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#![deny(warnings, rust_2018_idioms)]
#![forbid(unsafe_code)]

mod server;
mod stack;

pub use self::server::{Health, Latch, Readiness};
pub use self::stack::{Config, Task};
127 changes: 127 additions & 0 deletions linkerd/app/health/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//! Serves an HTTP health server.
//!
//! * `GET /ready` -- returns 200 when the proxy is ready to participate in meshed
//! traffic.
//! * `GET /live` -- returns 200 when the proxy is live.

use futures::future;
use http::StatusCode;
use hyper::{
body::{Body, HttpBody},
Request, Response,
};
use linkerd_app_core::Error;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

mod readiness;

pub use self::readiness::{Latch, Readiness};

#[derive(Clone)]
pub struct Health {
ready: Readiness,
}

pub type ResponseFuture =
Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send + 'static>>;

impl Health {
pub fn new(ready: Readiness) -> Self {
Self { ready }
}

fn ready_rsp(&self) -> Response<Body> {
if self.ready.is_ready() {
Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "text/plain")
.body("ready\n".into())
.expect("builder with known status code must not fail")
} else {
Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.body("not ready\n".into())
.expect("builder with known status code must not fail")
}
}

fn live_rsp() -> Response<Body> {
Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "text/plain")
.body("live\n".into())
.expect("builder with known status code must not fail")
}

fn not_found() -> Response<Body> {
Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(Body::empty())
.expect("builder with known status code must not fail")
}
}

impl<B> tower::Service<http::Request<B>> for Health
where
B: HttpBody + Send + Sync + 'static,
B::Error: Into<Error>,
B::Data: Send,
{
type Response = http::Response<Body>;
type Error = Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<B>) -> Self::Future {
match req.uri().path() {
"/live" => Box::pin(future::ok(Self::live_rsp())),
"/ready" => Box::pin(future::ok(self.ready_rsp())),
_ => Box::pin(future::ok(Self::not_found())),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use http::method::Method;
use std::time::Duration;
use tokio::time::timeout;
use tower::util::ServiceExt;

const TIMEOUT: Duration = Duration::from_secs(1);

#[tokio::test]
async fn ready_when_latches_dropped() {
let (r, l0) = Readiness::new();
let l1 = l0.clone();

let health = Health::new(r);
macro_rules! call {
() => {{
let r = Request::builder()
.method(Method::GET)
.uri("http://0.0.0.0/ready")
.body(Body::empty())
.unwrap();
let f = health.clone().oneshot(r);
timeout(TIMEOUT, f).await.expect("timeout").expect("call")
}};
}

assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE);

drop(l0);
assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE);

drop(l1);
assert_eq!(call!().status(), StatusCode::OK);
}
}
36 changes: 36 additions & 0 deletions linkerd/app/health/src/server/readiness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::sync::{Arc, Weak};

/// Tracks the processes's readiness to serve traffic.
///
/// Once `is_ready()` returns true, it will never return false.
#[derive(Clone, Debug)]
pub struct Readiness(Weak<()>);

/// When all latches are dropped, the process is considered ready.
#[derive(Clone, Debug)]
pub struct Latch(Arc<()>);

impl Readiness {
pub fn new() -> (Readiness, Latch) {
let r = Arc::new(());
(Readiness(Arc::downgrade(&r)), Latch(r))
}

pub fn is_ready(&self) -> bool {
self.0.upgrade().is_none()
}
}

/// ALways ready.
impl Default for Readiness {
fn default() -> Self {
Self::new().0
}
}

impl Latch {
/// Releases this readiness latch.
pub fn release(self) {
drop(self);
}
}
Loading

0 comments on commit ddc0b9d

Please sign in to comment.