Skip to content
Open
12 changes: 9 additions & 3 deletions docs/user_guide/phase.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Pingora-proxy allows users to insert arbitrary logic into the life of a request.
```mermaid
graph TD;
start("new request")-->early_request_filter;
early_request_filter-->request_filter;
early_request_filter-->early_request_body_filter;
early_request_body_filter-->request_filter;
request_filter-->upstream_peer;

upstream_peer-->Connect{{IO: connect to upstream}};
Expand Down Expand Up @@ -59,11 +60,16 @@ This is the first phase of every request.

This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules.

### `early_request_body_filter()`
This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `early_request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection.

Use this for processing that must happen before header filters run, such as streaming decompression. The buffered body is then available via `session.get_buffered_body()` in `request_filter()` for routing decisions, auth signature verification, or body mutation.

### `request_filter()`
This phase is usually for validating request inputs, rate limiting, and initializing context.
This phase is usually for validating request inputs, rate limiting, and initializing context. When early body buffering is enabled, the full body is already available via `session.get_buffered_body()`.

### `request_body_filter()`
This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received.
This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. This runs during the upstream forwarding phase, after `upstream_peer()` and connection establishment.

### `proxy_upstream_filter()`
This phase determines if we should continue to the upstream to serve a response. If we short-circuit, a 502 is returned by default, but a different response can be implemented.
Expand Down
3 changes: 2 additions & 1 deletion docs/user_guide/phase_chart.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ Pingora proxy phases without caching
```mermaid
graph TD;
start("new request")-->early_request_filter;
early_request_filter-->request_filter;
early_request_filter-->early_request_body_filter;
early_request_body_filter-->request_filter;
request_filter-->upstream_peer;

upstream_peer-->Connect{{IO: connect to upstream}};
Expand Down
165 changes: 165 additions & 0 deletions pingora-proxy/examples/body_routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2026 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Example: early request body buffering for routing and body mutation.
//!
//! Demonstrates three patterns enabled by `early_request_body_buffer_limit()` and
//! `early_request_body_filter()`:
//!
//! 1. **Stream**: process each body chunk as it arrives in
//! `early_request_body_filter()` — before any header-phase filters run.
//! The example logs each chunk's byte count to show the streaming nature.
//!
//! 2. **Peek**: read the assembled buffered body with `get_buffered_body()`
//! in `request_filter()` to make routing decisions.
//!
//! 3. **Mutate**: replace the buffered body with `set_buffered_body()` so
//! the upstream receives the modified version.
//!
//! Uses httpbin.org as the upstream — its `/post` endpoint echoes back the
//! request body, so you can verify mutations in the response.

use async_trait::async_trait;
use bytes::Bytes;
use log::info;

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_proxy::{ProxyHttp, Session};

pub struct MyProxy;

pub struct MyCtx {
route_beta: bool,
chunks_received: usize,
bytes_received: usize,
}

#[async_trait]
impl ProxyHttp for MyProxy {
type CTX = MyCtx;
fn new_ctx(&self) -> Self::CTX {
MyCtx {
route_beta: false,
chunks_received: 0,
bytes_received: 0,
}
}

/// Opt in to body buffering for POST requests up to 4KB.
fn early_request_body_buffer_limit(
&self,
session: &Session,
_ctx: &Self::CTX,
) -> Option<usize> {
if session.req_header().method == http::Method::POST {
Some(4096)
} else {
None
}
}

/// Stream: process each body chunk as it arrives during early buffering.
///
/// This fires per-chunk, BEFORE request_filter sees the assembled body.
async fn early_request_body_filter(
&self,
_session: &mut Session,
body: &mut Option<Bytes>,
end_of_stream: bool,
ctx: &mut Self::CTX,
) -> Result<()> {
if let Some(data) = body {
ctx.chunks_received += 1;
ctx.bytes_received += data.len();
info!(
"early_request_body_filter: chunk {} ({} bytes, {} total)",
ctx.chunks_received,
data.len(),
ctx.bytes_received
);
}
if end_of_stream {
info!(
"early_request_body_filter: done — {} chunks, {} bytes total",
ctx.chunks_received, ctx.bytes_received
);
}
Ok(())
}

async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
// By the time request_filter runs, the body is already buffered.
if let Some(body) = session.get_buffered_body() {
let text = std::str::from_utf8(body).unwrap_or("");

// Peek: inspect body content for routing decisions.
ctx.route_beta = text.contains("beta");
info!("peek: route_beta={}", ctx.route_beta);

// Mutate: wrap the original body in an envelope before forwarding.
let wrapped = format!(r#"{{"envelope":true,"original":{text}}}"#);
info!("mutate: {wrapped}");
session.set_buffered_body(Some(Bytes::from(wrapped.clone())));

// Update Content-Length to match the new body size.
session
.req_header_mut()
.insert_header(http::header::CONTENT_LENGTH, wrapped.len().to_string())?;
}
Ok(false)
}

async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
// httpbin.org echoes the request body back in its JSON response,
// so we can verify the mutation in the curl output.
let peer = Box::new(HttpPeer::new(
("httpbin.org", 443),
true,
"httpbin.org".to_string(),
));
Ok(peer)
}
}

// RUST_LOG=INFO cargo run --features openssl --example body_routing
//
// Peek + mutate — body is inspected for routing then wrapped in an envelope:
// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "beta"}'
// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "default"}'
//
// Multi-chunk — use chunked transfer encoding to see early_request_body_filter fire per-chunk:
// printf 'POST /post HTTP/1.1\r\nHost: httpbin.org\r\nTransfer-Encoding: chunked\r\n\r\na\r\n{"part":1}\r\na\r\n{"part":2}\r\n0\r\n\r\n' | nc 127.0.0.1 6193
//
// No buffering — GET requests pass through unchanged:
// curl 127.0.0.1:6193/get -H "Host: httpbin.org"
fn main() {
env_logger::init();

let opt = Opt::parse_args();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();

let mut my_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, MyProxy);
my_proxy.add_tcp("0.0.0.0:6193");

my_server.add_service(my_proxy);
my_server.run_forever();
}
Loading
Loading