diff --git a/src/client.rs b/src/client.rs index 0c3415c7..4393fdc7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1485,6 +1485,22 @@ impl ResponseFuture { pub fn stream_id(&self) -> crate::StreamId { crate::StreamId::from_internal(self.inner.stream_id()) } + + /// Polls for informational responses (1xx status codes). + /// + /// This method should be called before polling the main response future + /// to check for any informational responses that have been received. + /// + /// Returns `Poll::Ready(Some(response))` if an informational response is available, + /// `Poll::Ready(None)` if no more informational responses are expected, + /// or `Poll::Pending` if no informational response is currently available. + pub fn poll_informational( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, crate::Error>>> { + self.inner.poll_informational(cx).map_err(Into::into) + } + /// Returns a stream of PushPromises /// /// # Panics diff --git a/src/codec/error.rs b/src/codec/error.rs index 0acb913e..3bcd1320 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -49,6 +49,9 @@ pub enum UserError { /// Tries to send push promise to peer who has disabled server push PeerDisabledServerPush, + + /// Invalid status code for informational response (must be 1xx) + InvalidInformationalStatusCode, } // ===== impl SendError ===== @@ -97,6 +100,7 @@ impl fmt::Display for UserError { SendPingWhilePending => "send_ping before received previous pong", SendSettingsWhilePending => "sending SETTINGS before received previous ACK", PeerDisabledServerPush => "sending PUSH_PROMISE to peer who disabled server push", + InvalidInformationalStatusCode => "invalid informational status code", }) } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index d49ec380..86ba41cf 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -66,6 +66,7 @@ pub(super) enum Event { Headers(peer::PollMessage), Data(Bytes), Trailers(HeaderMap), + InformationalHeaders(peer::PollMessage), } #[derive(Debug)] @@ -264,6 +265,21 @@ impl Recv { // corresponding headers frame pushed to `stream.pending_recv`. self.pending_accept.push(stream); } + } else { + // This is an informational response (1xx status code) + // Convert to response and store it for polling + let message = counts + .peer() + .convert_poll_message(pseudo, fields, stream_id)?; + + tracing::trace!("Received informational response: {:?}", message); + + // Push the informational response onto the stream's recv buffer + // with a special event type so it can be polled separately + stream + .pending_recv + .push_back(&mut self.buffer, Event::InformationalHeaders(message)); + stream.notify_recv(); } Ok(()) @@ -324,24 +340,63 @@ impl Recv { ) -> Poll, proto::Error>> { use super::peer::PollMessage::*; - // If the buffer is not empty, then the first frame must be a HEADERS - // frame or the user violated the contract. - match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), - Some(_) => panic!("poll_response called after response returned"), - None => { - if !stream.state.ensure_recv_open()? { - proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); - return Poll::Ready(Err(Error::library_reset( - stream.id, - Reason::PROTOCOL_ERROR, - ))); + // Skip over any interim informational headers to find the main response + loop { + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)), + Some(Event::InformationalHeaders(_)) => { + // Skip interim informational headers - they should be consumed by poll_informational + continue; } + Some(_) => panic!("poll_response called after response returned"), + None => { + if !stream.state.ensure_recv_open()? { + proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); + return Poll::Ready(Err(Error::library_reset( + stream.id, + Reason::PROTOCOL_ERROR, + ))); + } - stream.recv_task = Some(cx.waker().clone()); - Poll::Pending + stream.recv_task = Some(cx.waker().clone()); + return Poll::Pending; + } + } + } + } + + /// Called by the client to get informational responses (1xx status codes) + pub fn poll_informational( + &mut self, + cx: &Context, + stream: &mut store::Ptr, + ) -> Poll, proto::Error>>> { + use super::peer::PollMessage::*; + + // Try to pop the front event and check if it's an informational response + // If it's not, we put it back + if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) { + match event { + Event::InformationalHeaders(Client(response)) => { + // Found an informational response, return it + return Poll::Ready(Some(Ok(response))); + } + other => { + // Not an informational response, put it back at the front + stream.pending_recv.push_front(&mut self.buffer, other); + } } } + + // No informational response available at the front + if stream.state.ensure_recv_open()? { + // Request to get notified once more frames arrive + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending + } else { + // No more frames will be received + Poll::Ready(None) + } } /// Transition the stream based on receiving trailers diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 6edb6b07..e1417c77 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -167,6 +167,47 @@ impl Send { Ok(()) } + /// Send interim informational headers (1xx responses) without changing stream state. + /// This allows multiple interim informational responses to be sent before the final response. + pub fn send_interim_informational_headers( + &mut self, + frame: frame::Headers, + buffer: &mut Buffer>, + stream: &mut store::Ptr, + _counts: &mut Counts, + task: &mut Option, + ) -> Result<(), UserError> { + tracing::trace!( + "send_interim_informational_headers; frame={:?}; stream_id={:?}", + frame, + frame.stream_id() + ); + + // Validate headers + Self::check_headers(frame.fields())?; + + // Ensure this is an informational response (1xx status code) + if !frame.is_informational() { + tracing::trace!( + "send_interim_informational_headers called with non-informational frame" + ); + return Err(UserError::UnexpectedFrameType); + } + + // Ensure the frame is not marked as end_stream for informational responses + if frame.is_end_stream() { + tracing::trace!("send_interim_informational_headers called with end_stream=true"); + return Err(UserError::UnexpectedFrameType); + } + + // Queue the frame for sending WITHOUT changing stream state + // This is the key difference from send_headers - we don't call stream.state.send_open() + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + /// Send an explicit RST_STREAM frame pub fn send_reset( &mut self, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index af177ff4..da612ddc 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1150,6 +1150,42 @@ impl StreamRef { } } + pub fn send_informational_headers(&mut self, frame: frame::Headers) -> Result<(), UserError> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.opaque.key); + let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.counts.transition(stream, |counts, stream| { + // For informational responses (1xx), we need to send headers without + // changing the stream state. This allows multiple informational responses + // to be sent before the final response. + + // Validate that this is actually an informational response + if !frame.is_informational() { + return Err(UserError::UnexpectedFrameType); + } + + // Ensure the frame is not marked as end_stream for informational responses + if frame.is_end_stream() { + return Err(UserError::UnexpectedFrameType); + } + + // Send the interim informational headers directly to the buffer without state changes + // This bypasses the normal send_headers flow that would transition the stream state + actions.send.send_interim_informational_headers( + frame, + send_buffer, + stream, + counts, + &mut actions.task, + ) + }) + } + pub fn send_response( &mut self, mut response: Response<()>, @@ -1334,6 +1370,19 @@ impl OpaqueStreamRef { me.actions.recv.poll_response(cx, &mut stream) } + + /// Called by a client to check for informational responses (1xx status codes) + pub fn poll_informational( + &mut self, + cx: &Context, + ) -> Poll, proto::Error>>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_informational(cx, &mut stream) + } /// Called by a client to check for a pushed request. pub fn poll_pushed( &mut self, diff --git a/src/server.rs b/src/server.rs index 3ce3eb43..284629a5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1102,6 +1102,105 @@ impl Default for Builder { // ===== impl SendResponse ===== impl SendResponse { + /// Send an interim informational response (1xx status codes) + /// + /// This method can be called multiple times before calling `send_response()` + /// to send the final response. Only 1xx status codes are allowed. + /// + /// Interim informational responses are used to provide early feedback to the client + /// before the final response is ready. Common examples include: + /// - 100 Continue: Indicates the client should continue with the request + /// - 103 Early Hints: Provides early hints about resources to preload + /// + /// # Arguments + /// * `response` - HTTP response with 1xx status code and headers + /// + /// # Returns + /// * `Ok(())` - Interim Informational response sent successfully + /// * `Err(Error)` - Failed to send (invalid status code, connection error, etc.) + /// + /// # Examples + /// ```rust + /// use h2::server; + /// use http::{Response, StatusCode}; + /// + /// # async fn example(mut send_response: h2::server::SendResponse) -> Result<(), h2::Error> { + /// // Send 100 Continue before processing request body + /// let continue_response = Response::builder() + /// .status(StatusCode::CONTINUE) + /// .body(()) + /// .unwrap(); + /// send_response.send_informational(continue_response)?; + /// + /// // Later send the final response + /// let final_response = Response::builder() + /// .status(StatusCode::OK) + /// .body(()) + /// .unwrap(); + /// let _stream = send_response.send_response(final_response, false)?; + /// # Ok(()) + /// # } + /// ``` + /// + /// # Errors + /// This method will return an error if: + /// - The response status code is not in the 1xx range + /// - The final response has already been sent + /// - There is a connection-level error + pub fn send_informational(&mut self, response: Response<()>) -> Result<(), crate::Error> { + let stream_id = self.inner.stream_id(); + let status = response.status(); + + tracing::trace!( + "send_informational called with status: {} on stream: {:?}", + status, + stream_id + ); + + // Validate that this is an informational response (1xx status code) + if !response.status().is_informational() { + tracing::trace!( + "invalid informational status code: {} on stream: {:?}", + status, + stream_id + ); + // Return an error for invalid status codes + return Err(crate::Error::from( + UserError::InvalidInformationalStatusCode, + )); + } + + tracing::trace!( + "converting informational response to HEADERS frame for stream: {:?}", + stream_id + ); + + // Convert the response to a HEADERS frame without END_STREAM flag + // Use the proper Peer::convert_send_message method for informational responses + let frame = Peer::convert_send_message( + stream_id, response, false, // NOT end_of_stream for informational responses + ); + + tracing::trace!( + "sending interim informational headers frame for stream: {:?}", + stream_id + ); + + // Use the proper H2 streams API for sending interim informational headers + // This bypasses the normal response flow and allows multiple informational responses + let result = self + .inner + .send_informational_headers(frame) + .map_err(Into::into); + + match &result { + Ok(()) => tracing::trace!("Successfully sent informational headers"), + Err(e) => tracing::trace!("Failed to send informational headers: {:?}", e), + } + + result + } + /// Send a response to a client request. /// /// On success, a [`SendStream`] instance is returned. This instance can be diff --git a/tests/h2-tests/tests/informational_responses.rs b/tests/h2-tests/tests/informational_responses.rs new file mode 100644 index 00000000..f607ef63 --- /dev/null +++ b/tests/h2-tests/tests/informational_responses.rs @@ -0,0 +1,387 @@ +#![deny(warnings)] + +use futures::{future::poll_fn, StreamExt}; +use h2_support::prelude::*; +use http::{Response, StatusCode}; + +#[tokio::test] +async fn send_100_continue() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + // Send a POST request + client + .send_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + + // Expect 100 Continue response first + client + .recv_frame(frames::headers(1).response(StatusCode::CONTINUE)) + .await; + + // Send request body after receiving 100 Continue + client + .send_frame(frames::data(1, &b"request body"[..]).eos()) + .await; + + // Expect final response + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::POST); + + // Send 100 Continue informational response + let continue_response = Response::builder() + .status(StatusCode::CONTINUE) + .body(()) + .unwrap(); + stream.send_informational(continue_response).unwrap(); + + // Send final response + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn send_103_early_hints() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + // Send a GET request + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Expect 103 Early Hints response first + client + .recv_frame(frames::headers(1).response(StatusCode::EARLY_HINTS).field( + "link", + "; rel=preload; as=style, ; rel=preload; as=script", + )) + .await; + + // Expect final response + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Send 103 Early Hints informational response + let early_hints_response = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header( + "link", + "; rel=preload; as=style, ; rel=preload; as=script", + ) + .body(()) + .unwrap(); + stream.send_informational(early_hints_response).unwrap(); + + // Send final response + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn send_multiple_informational_responses() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + + // Expect 100 Continue + client + .recv_frame(frames::headers(1).response(StatusCode::CONTINUE)) + .await; + + client + .send_frame(frames::data(1, &b"request body"[..]).eos()) + .await; + + // Expect 103 Early Hints + client + .recv_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload; as=style"), + ) + .await; + + // Expect final response + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::POST); + + // Send 100 Continue + let continue_response = Response::builder() + .status(StatusCode::CONTINUE) + .body(()) + .unwrap(); + stream.send_informational(continue_response).unwrap(); + + // Send 103 Early Hints + let early_hints_response = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style") + .body(()) + .unwrap(); + stream.send_informational(early_hints_response).unwrap(); + + // Send final response + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn invalid_informational_status_returns_error() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Should only receive the final response since invalid informational response errors out + client + .recv_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + // Try to send invalid informational response (200 is not 1xx) + // This should return an error + let invalid_response = Response::builder().status(StatusCode::OK).body(()).unwrap(); + let result = stream.send_informational(invalid_response); + + // Expect error for invalid status code + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("invalid informational status code")); + + // Send actual final response after error + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn client_poll_informational_responses() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let recv_settings = srv.assert_client_handshake().await; + assert_default_settings!(recv_settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + + // Send 103 Early Hints + srv.send_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload"), + ) + .await; + + // Send final response + srv.send_frame(frames::headers(1).response(StatusCode::OK).eos()) + .await; + }; + + let client = async move { + let (client, connection) = client::handshake(io).await.expect("handshake"); + + let request = Request::builder() + .method("GET") + .uri("https://example.com/") + .body(()) + .unwrap(); + + let (mut response_future, _) = client + .ready() + .await + .unwrap() + .send_request(request, true) + .unwrap(); + + let conn_fut = async move { + connection.await.expect("connection error"); + }; + + let response_fut = async move { + // Poll for informational responses + loop { + match poll_fn(|cx| response_future.poll_informational(cx)).await { + Some(Ok(info_response)) => { + assert_eq!(info_response.status(), StatusCode::EARLY_HINTS); + assert_eq!( + info_response.headers().get("link").unwrap(), + "; rel=preload" + ); + break; + } + Some(Err(e)) => panic!("Error polling informational: {:?}", e), + None => break, + } + } + + // Get the final response + let response = response_future.await.expect("response error"); + assert_eq!(response.status(), StatusCode::OK); + }; + + join(conn_fut, response_fut).await; + }; + + join(srv, client).await; +} + +#[tokio::test] +async fn informational_responses_with_body_streaming() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame(frames::headers(1).request("POST", "https://example.com/")) + .await; + + // Expect 100 Continue + client + .recv_frame(frames::headers(1).response(StatusCode::CONTINUE)) + .await; + + client.send_frame(frames::data(1, &b"chunk1"[..])).await; + + // Expect 103 Early Hints while still receiving body + client + .recv_frame( + frames::headers(1) + .response(StatusCode::EARLY_HINTS) + .field("link", "; rel=preload"), + ) + .await; + + client + .send_frame(frames::data(1, &b"chunk2"[..]).eos()) + .await; + + // Expect final response with streaming body + client + .recv_frame(frames::headers(1).response(StatusCode::OK)) + .await; + + client + .recv_frame(frames::data(1, &b"response data"[..]).eos()) + .await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::POST); + + // Send 100 Continue + let continue_response = Response::builder() + .status(StatusCode::CONTINUE) + .body(()) + .unwrap(); + stream.send_informational(continue_response).unwrap(); + + // Send 103 Early Hints while processing + let early_hints_response = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload") + .body(()) + .unwrap(); + stream.send_informational(early_hints_response).unwrap(); + + // Send final response with body + let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); + let mut send_stream = stream.send_response(rsp, false).unwrap(); + + send_stream.send_data("response data".into(), true).unwrap(); + + assert!(srv.next().await.is_none()); + }; + + join(client, srv).await; +}