diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index 3c80f913..2e357211 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -17,7 +17,8 @@ Pingora-proxy allows users to insert arbitrary logic into the life of a request. ```mermaid graph TD; start("new request")-->early_request_filter; - early_request_filter-->request_filter; + early_request_filter-->early_request_body_filter; + early_request_body_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; @@ -59,11 +60,16 @@ This is the first phase of every request. This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. +### `early_request_body_filter()` +This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `early_request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. + +Use this for processing that must happen before header filters run, such as streaming decompression. The buffered body is then available via `session.get_buffered_body()` in `request_filter()` for routing decisions, auth signature verification, or body mutation. + ### `request_filter()` -This phase is usually for validating request inputs, rate limiting, and initializing context. +This phase is usually for validating request inputs, rate limiting, and initializing context. When early body buffering is enabled, the full body is already available via `session.get_buffered_body()`. ### `request_body_filter()` -This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. +This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. This runs during the upstream forwarding phase, after `upstream_peer()` and connection establishment. ### `proxy_upstream_filter()` This phase determines if we should continue to the upstream to serve a response. If we short-circuit, a 502 is returned by default, but a different response can be implemented. diff --git a/docs/user_guide/phase_chart.md b/docs/user_guide/phase_chart.md index 94988724..73adae14 100644 --- a/docs/user_guide/phase_chart.md +++ b/docs/user_guide/phase_chart.md @@ -2,7 +2,8 @@ Pingora proxy phases without caching ```mermaid graph TD; start("new request")-->early_request_filter; - early_request_filter-->request_filter; + early_request_filter-->early_request_body_filter; + early_request_body_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; diff --git a/pingora-proxy/examples/body_routing.rs b/pingora-proxy/examples/body_routing.rs new file mode 100644 index 00000000..85baf1b9 --- /dev/null +++ b/pingora-proxy/examples/body_routing.rs @@ -0,0 +1,165 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example: early request body buffering for routing and body mutation. +//! +//! Demonstrates three patterns enabled by `early_request_body_buffer_limit()` and +//! `early_request_body_filter()`: +//! +//! 1. **Stream**: process each body chunk as it arrives in +//! `early_request_body_filter()` — before any header-phase filters run. +//! The example logs each chunk's byte count to show the streaming nature. +//! +//! 2. **Peek**: read the assembled buffered body with `get_buffered_body()` +//! in `request_filter()` to make routing decisions. +//! +//! 3. **Mutate**: replace the buffered body with `set_buffered_body()` so +//! the upstream receives the modified version. +//! +//! Uses httpbin.org as the upstream — its `/post` endpoint echoes back the +//! request body, so you can verify mutations in the response. + +use async_trait::async_trait; +use bytes::Bytes; +use log::info; + +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_proxy::{ProxyHttp, Session}; + +pub struct MyProxy; + +pub struct MyCtx { + route_beta: bool, + chunks_received: usize, + bytes_received: usize, +} + +#[async_trait] +impl ProxyHttp for MyProxy { + type CTX = MyCtx; + fn new_ctx(&self) -> Self::CTX { + MyCtx { + route_beta: false, + chunks_received: 0, + bytes_received: 0, + } + } + + /// Opt in to body buffering for POST requests up to 4KB. + fn early_request_body_buffer_limit( + &self, + session: &Session, + _ctx: &Self::CTX, + ) -> Option { + if session.req_header().method == http::Method::POST { + Some(4096) + } else { + None + } + } + + /// Stream: process each body chunk as it arrives during early buffering. + /// + /// This fires per-chunk, BEFORE request_filter sees the assembled body. + async fn early_request_body_filter( + &self, + _session: &mut Session, + body: &mut Option, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result<()> { + if let Some(data) = body { + ctx.chunks_received += 1; + ctx.bytes_received += data.len(); + info!( + "early_request_body_filter: chunk {} ({} bytes, {} total)", + ctx.chunks_received, + data.len(), + ctx.bytes_received + ); + } + if end_of_stream { + info!( + "early_request_body_filter: done — {} chunks, {} bytes total", + ctx.chunks_received, ctx.bytes_received + ); + } + Ok(()) + } + + async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result { + // By the time request_filter runs, the body is already buffered. + if let Some(body) = session.get_buffered_body() { + let text = std::str::from_utf8(body).unwrap_or(""); + + // Peek: inspect body content for routing decisions. + ctx.route_beta = text.contains("beta"); + info!("peek: route_beta={}", ctx.route_beta); + + // Mutate: wrap the original body in an envelope before forwarding. + let wrapped = format!(r#"{{"envelope":true,"original":{text}}}"#); + info!("mutate: {wrapped}"); + session.set_buffered_body(Some(Bytes::from(wrapped.clone()))); + + // Update Content-Length to match the new body size. + session + .req_header_mut() + .insert_header(http::header::CONTENT_LENGTH, wrapped.len().to_string())?; + } + Ok(false) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + // httpbin.org echoes the request body back in its JSON response, + // so we can verify the mutation in the curl output. + let peer = Box::new(HttpPeer::new( + ("httpbin.org", 443), + true, + "httpbin.org".to_string(), + )); + Ok(peer) + } +} + +// RUST_LOG=INFO cargo run --features openssl --example body_routing +// +// Peek + mutate — body is inspected for routing then wrapped in an envelope: +// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "beta"}' +// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "default"}' +// +// Multi-chunk — use chunked transfer encoding to see early_request_body_filter fire per-chunk: +// printf 'POST /post HTTP/1.1\r\nHost: httpbin.org\r\nTransfer-Encoding: chunked\r\n\r\na\r\n{"part":1}\r\na\r\n{"part":2}\r\n0\r\n\r\n' | nc 127.0.0.1 6193 +// +// No buffering — GET requests pass through unchanged: +// curl 127.0.0.1:6193/get -H "Host: httpbin.org" +fn main() { + env_logger::init(); + + let opt = Opt::parse_args(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, MyProxy); + my_proxy.add_tcp("0.0.0.0:6193"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index f89f53d3..4199931b 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -468,6 +468,13 @@ pub struct Session { upstream_write_pending_time: Duration, /// Flag that is set when the shutdown process has begun. shutdown_flag: Arc, + /// Request body buffered early (before upstream connection) for auth/routing decisions. + /// When set, body forwarding will use this instead of re-reading from downstream. + /// Use accessor methods: `get_buffered_body()`, `take_buffered_body()`, `set_buffered_body()`. + buffered_request_body: Option, + /// Whether body has been fully consumed for buffering. + /// Use accessor: `is_body_buffered()`. + body_buffered: bool, } impl Session { @@ -489,6 +496,8 @@ impl Session { upstream_body_bytes_received: 0, upstream_write_pending_time: Duration::ZERO, shutdown_flag, + buffered_request_body: None, + body_buffered: false, } } @@ -675,6 +684,73 @@ impl Session { self.shutdown_flag.load(Ordering::Acquire) } + /// Returns a reference to the buffered request body, if any. + /// + /// The body is buffered by `buffer_request_body_early()` when the trait method + /// `early_request_body_buffer_limit()` returns `Some(max_size)`. + pub fn get_buffered_body(&self) -> Option<&Bytes> { + self.buffered_request_body.as_ref() + } + + /// Takes ownership of the buffered request body, leaving `None` in its place. + /// + /// Use this when forwarding the body to upstream - takes the body once for sending. + pub fn take_buffered_body(&mut self) -> Option { + self.buffered_request_body.take() + } + + /// Sets the buffered request body. + /// + /// This is called by `buffer_request_body_early()` after reading the full body. + /// Also useful for app code that wants to replace the body (e.g., decompression). + pub fn set_buffered_body(&mut self, body: Option) { + self.body_buffered = body.is_some() || self.body_buffered; + self.buffered_request_body = body; + } + + /// Returns whether a body has been buffered (or confirmed empty). + /// + /// When `true`, the body has been fully read and is available via `get_buffered_body()`, + /// or the request has no body. Body forwarding will skip re-reading from downstream. + pub fn is_body_buffered(&self) -> bool { + self.body_buffered + } + + /// Marks the body as buffered without setting a body. + /// + /// Used when the request has no body (no Content-Length or Transfer-Encoding), + /// to prevent `buffer_request_body_early()` from attempting to read. + pub fn mark_body_buffered(&mut self) { + self.body_buffered = true; + } + + /// Creates a Session from an H1 HttpSession (for testing only). + #[cfg(test)] + pub fn new_h1_with_http_session( + http_session: pingora_core::protocols::http::v1::server::HttpSession, + ) -> Self { + use pingora_cache::HttpCache; + use pingora_core::protocols::http::compression::ResponseCompressionCtx; + use pingora_core::protocols::http::ServerSession; + + let shutdown_flag = Arc::new(AtomicBool::new(false)); + Session { + downstream_session: Box::new(ServerSession::H1(http_session)), + cache: HttpCache::new(), + upstream_compression: ResponseCompressionCtx::new(0, false, false), + ignore_downstream_range: false, + upstream_headers_mutated_for_cache: false, + subrequest_ctx: None, + subrequest_spawner: None, + downstream_modules_ctx: HttpModuleCtx::empty(), + upstream_body_bytes_received: 0, + upstream_write_pending_time: Duration::ZERO, + shutdown_flag, + buffered_request_body: None, + body_buffered: false, + } + } + pub fn downstream_custom_message( &mut self, ) -> Result< @@ -757,6 +833,16 @@ where .await; } + // early body buffering: read full request body before request_filter + // see https://github.com/cloudflare/pingora/issues/780 + if !session.is_body_buffered() { + if let Err(e) = self.buffer_request_body_early(&mut session, &mut ctx).await { + return self + .handle_error(session, &mut ctx, e, "Failed to buffer request body:") + .await; + } + } + if self.inner.allow_spawning_subrequest(&session, &ctx) { session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone())); } @@ -983,6 +1069,124 @@ where None } } + + /// Buffer the entire request body before connecting to upstream. + /// + /// This enables early_request_body_filter to run BEFORE upstream_peer selection, + /// allowing auth signature verification and content-based routing. + /// + /// Buffering is controlled by the trait method `early_request_body_buffer_limit()`: + /// - Returns `None`: Skip buffering, stream body to upstream (default) + /// - Returns `Some(max_size)`: Buffer body with size limit enforcement + /// + /// Size limit enforcement: + /// - Content-Length checked first (fail fast before reading) + /// - Accumulated size checked during reading (streaming protection) + /// - Returns HTTP 413 (Payload Too Large) if exceeded + async fn buffer_request_body_early( + &self, + session: &mut Session, + ctx: &mut ::CTX, + ) -> Result<()> + where + SV: ProxyHttp + Send + Sync, + ::CTX: Send + Sync, + { + // check trait method for opt-in and size limit + let Some(max_size) = self.inner.early_request_body_buffer_limit(session, ctx) else { + return Ok(()); + }; + + // skip if already buffered + if session.is_body_buffered() { + return Ok(()); + } + + let content_length = session + .downstream_session + .req_header() + .headers + .get(header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + + // fail fast: reject before reading if Content-Length exceeds limit + if let Some(cl) = content_length { + if cl > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body too large: Content-Length {} exceeds limit {} bytes", + cl, max_size + ), + ); + } + } + + // Content-Length: 0 means no body; for all other cases (no Content-Length, + // Transfer-Encoding, HTTP/2) attempt to read. read_body_or_idle returns + // None immediately if there's nothing. + if content_length == Some(0) { + session.mark_body_buffered(); + return Ok(()); + } + + let mut body_parts: Vec = Vec::new(); + let mut total_size: usize = 0; + + // read body chunks until end of stream + loop { + let body_chunk: Option = + match session.downstream_session.read_body_or_idle(false).await { + Ok(chunk) => chunk, + Err(e) => return Err(e.into_down()), + }; + + // end of stream: None means no more data, or downstream reports done + let end_of_body = body_chunk.is_none() || session.downstream_session.is_body_done(); + + // run early body filter (not module filters, they haven't run header filter yet) + let mut filter_data = body_chunk; + self.inner + .early_request_body_filter(session, &mut filter_data, end_of_body, ctx) + .await?; + + // accumulate the (possibly filtered) data + if let Some(filtered) = filter_data { + total_size += filtered.len(); + + // check size limit during accumulation + if total_size > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body exceeded limit: {} > {} bytes", + total_size, max_size + ), + ); + } + + body_parts.push(filtered); + } + + if end_of_body { + break; + } + } + + // combine all chunks into a single Bytes buffer + if total_size > 0 { + let mut combined = bytes::BytesMut::with_capacity(total_size); + for part in body_parts { + combined.extend_from_slice(&part); + } + session.set_buffered_body(Some(combined.freeze())); + } else { + session.mark_body_buffered(); + } + + Ok(()) + } } /* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649 @@ -1395,3 +1599,79 @@ where Service::new(name, proxy) } } + +#[cfg(test)] +mod tests { + use super::*; + + // Test Session body buffering accessors + mod body_buffer { + use super::*; + use pingora_core::protocols::http::v1::server::HttpSession; + use tokio_test::io::Builder; + + fn create_test_session() -> Session { + // Create a minimal mock stream for testing using tokio-test Builder. + // Use empty mock since we only test accessor methods, not HTTP parsing. + let mock_io = Builder::new().build(); + let http_session = HttpSession::new(Box::new(mock_io)); + Session::new_h1_with_http_session(http_session) + } + + #[test] + fn test_initial_state() { + let session = create_test_session(); + assert!(!session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + + #[test] + fn test_set_and_get_buffered_body() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body.clone())); + + assert!(session.is_body_buffered()); + assert_eq!(session.get_buffered_body(), Some(&body)); + } + + #[test] + fn test_take_buffered_body() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body.clone())); + let taken = session.take_buffered_body(); + + assert_eq!(taken, Some(body)); + assert!(session.get_buffered_body().is_none()); + // is_body_buffered should remain true after take + assert!(session.is_body_buffered()); + } + + #[test] + fn test_mark_body_buffered() { + let mut session = create_test_session(); + + assert!(!session.is_body_buffered()); + session.mark_body_buffered(); + assert!(session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + + #[test] + fn test_set_none_preserves_buffered_flag() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body)); + assert!(session.is_body_buffered()); + + // Setting None should preserve the buffered flag + session.set_buffered_body(None); + assert!(session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + } +} diff --git a/pingora-proxy/src/proxy_common.rs b/pingora-proxy/src/proxy_common.rs index e1d36f69..19dbd65b 100644 --- a/pingora-proxy/src/proxy_common.rs +++ b/pingora-proxy/src/proxy_common.rs @@ -5,6 +5,8 @@ pub(crate) enum DownstreamStateMachine { Reading, /// no more data to read ReadingFinished, + /// body was pre-buffered before upstream connection, skip all downstream polling + PreBuffered, /// downstream is already errored or closed Errored, } @@ -19,9 +21,10 @@ impl DownstreamStateMachine { } } - // Can call read() to read more data or wait on closing + // Can call read() to read more data or wait on closing. + // PreBuffered skips polling since we already have the complete body. pub fn can_poll(&self) -> bool { - !matches!(self, Self::Errored) + !matches!(self, Self::Errored | Self::PreBuffered) } pub fn is_reading(&self) -> bool { diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 9f04289c..05bef547 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -302,9 +302,19 @@ where .await?; } - let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); - - let buffer = session.as_ref().get_retry_buffer(); + // determine initial downstream state and body buffer + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + let (mut downstream_state, buffer) = if let Some(body) = session + .take_buffered_body() + .filter(|_| session.is_body_buffered()) + { + (DownstreamStateMachine::PreBuffered, Some(body)) + } else { + ( + DownstreamStateMachine::new(session.as_mut().is_body_done()), + session.as_ref().get_retry_buffer(), + ) + }; // retry, send buffer if it exists or body empty if buffer.is_some() || session.as_mut().is_body_empty() { diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 808da5bc..b40ac53f 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -297,10 +297,22 @@ where .await?; } - let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + // determine initial downstream state and body buffer + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + let (mut downstream_state, buffer) = if let Some(body) = session + .take_buffered_body() + .filter(|_| session.is_body_buffered()) + { + (DownstreamStateMachine::PreBuffered, Some(body)) + } else { + ( + DownstreamStateMachine::new(session.as_mut().is_body_done()), + session.as_mut().get_retry_buffer(), + ) + }; // retry, send buffer if it exists - if let Some(buffer) = session.as_mut().get_retry_buffer() { + if let Some(buffer) = buffer { self.send_body_to2( session, Some(buffer), diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index d5a3efde..61625cd4 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -206,6 +206,57 @@ pub trait ProxyHttp { Ok(true) } + /// Determine whether to buffer the entire request body before connecting to upstream. + /// + /// This is called after [`Self::early_request_filter()`] but before [`Self::request_filter()`] + /// and [`Self::upstream_peer()`]. The body is buffered in `Session::buffered_request_body` + /// and can be accessed via [`Session::get_buffered_body()`]. + /// + /// # Returns + /// - `None`: Don't buffer, stream body to upstream (default) + /// - `Some(max_size)`: Buffer body with size limit, return 413 error if exceeded + /// + /// # Use Cases + /// - Auth signature verification (need full body before auth decision) + /// - Content-based routing decisions + /// - Body transformation before upstream selection + /// + /// # Size Limit Enforcement + /// When returning `Some(max_size)`: + /// - Content-Length header is checked first (fail fast before reading) + /// - Body size is checked during accumulation (streaming protection) + /// - If exceeded, returns HTTP 413 (Payload Too Large) + fn early_request_body_buffer_limit( + &self, + _session: &Session, + _ctx: &Self::CTX, + ) -> Option { + None // Default: stream body to upstream + } + + /// Handle each chunk of request body during early buffering. + /// + /// This is called during [`buffer_request_body_early()`] for each body chunk, **before** + /// [`Self::request_filter()`] and [`Self::upstream_peer()`]. Use this for processing + /// that must happen before header filters run (e.g., streaming decompression). + /// + /// Unlike [`Self::request_body_filter()`], this callback explicitly runs before any + /// header-phase filters, so it should not depend on state set by [`Self::request_filter()`]. + /// + /// The normal [`Self::request_body_filter()`] still runs during upstream body forwarding. + async fn early_request_body_filter( + &self, + _session: &mut Session, + _body: &mut Option, + _end_of_stream: bool, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + Ok(()) + } + /// Decide if the response is cacheable fn response_cache_filter( &self,