feat(guardrail): add whole replay for streaming chat#108
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughBuffers upstream SSE chunks until a whole-response guardrail check runs on the collected payload; on approval the buffered chunks are replayed, on rejection an SSE error is emitted. Adds collector output extraction, adapter hooks, driver types, SSE wiring, and tests for blocked/approved streamed flows. ChangesStreaming Output Guardrails with Replay
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 6✅ Passed checks (6 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
| GuardrailOutcome::Block { reason } => { | ||
| return Err(GatewayError::Validation(format!( | ||
| "guardrail {} blocked output: {}", | ||
| guardrail.name(), | ||
| reason | ||
| )) | ||
| .into()); |
There was a problem hiding this comment.
@coderabbitai Error handling should support both a strict rejection mode that returns a 400 Bad Request status code and a more lenient mode that returns a simulated "Let's change the subject" response.
This will be implemented in a future pull request; please create an issue to track it.
There was a problem hiding this comment.
🧩 Analysis chain
Sure, I'll create a tracking issue for that now.
🏁 Script executed:
gh issue create \
--repo api7/aisix \
--title "Streaming output guardrails: support strict (400) and lenient (topic-change) rejection modes" \
--body "## Summary
Currently, \`apply_stream_output_guardrails\` in \`src/proxy/handlers/format_handler.rs\` only supports a single hard-rejection path that returns a \`GatewayError::Validation\` error. Error handling should be extended to support two configurable modes:
1. **Strict mode** – return a \`400 Bad Request\` status code when the guardrail blocks streaming output.
2. **Lenient mode** – instead of an error, return a simulated \"Let's change the subject\" (topic-deflection) response, giving a softer guardrail experience.
## Background
Raised during review of PR `#108` (comment: https://github.com/api7/aisix/pull/108#discussion_r3258681325) as a follow-up to the streaming output guardrail implementation.
## Requested by
`@bzp2010`
" \
--assignee bzp2010
Length of output: 41
@bzp2010, I've created a tracking issue for this: #109 — "Streaming output guardrails: support strict (400) and lenient (topic-change) rejection modes", assigned to you. It captures both the strict (400 Bad Request) and lenient (topic-deflection) modes, along with a backlink to this PR and comment.
🧬 Code Graph Analysis Results
- File: src/utils/future.rs, lines 10-19
pub async fn maybe_timeout<F, T>(dur: Option<Duration>, fut: F) -> Result<T, Elapsed>
where
F: Future<Output = T>,
{
match dur {
Some(d) if d.is_zero() => Ok(fut.await),
Some(d) => tokio::time::timeout(d, fut).await,
None => Ok(fut.await),
}
}
- File: src/proxy/guardrails.rs, lines 50-53
async fn check(
&self,
payload: &GuardrailCheckPayload,
) -> Result<Option<GuardrailOutcome>, GatewayError>;
- File: src/proxy/guardrails.rs, lines 82-101
async fn check(
&self,
payload: &GuardrailCheckPayload,
) -> Result<Option<GuardrailOutcome>, GatewayError> {
if !self.supports_stage(payload.stage()) {
return Ok(None);
}
self.runtime
.check(payload, &self.config)
.await
.map(Some)
.map_err(|error| {
GatewayError::Internal(format!(
"guardrail {} failed: {}",
self.runtime.name(),
error
))
})
}
- File: src/proxy/guardrails.rs, lines 424-431
async fn check(
&self,
payload: &GuardrailCheckPayload,
_config: &(),
) -> Result<GuardrailOutcome, Self::Error> {
self.seen_payloads.lock().unwrap().push(payload.clone());
Ok(self.outcome.clone())
}
- File: src/proxy/handlers/messages/mod.rs, lines 75-77
fn span_name() -> &'static str {
"aisix.llm.messages"
}
- File: src/proxy/handlers/messages/mod.rs, lines 79-81
fn missing_model_error() -> Self::Error {
MessagesError::MissingModelInContext
}
- File: src/proxy/handlers/messages/mod.rs, lines 83-85
fn set_model(request: &mut Self::Request, model: String) {
request.model = model;
}
- File: src/proxy/handlers/messages/mod.rs, lines 87-93
fn request_span_properties(
request: &Self::Request,
provider: &dyn ProviderCapabilities,
base_url: Option<&Url>,
) -> Vec<(String, String)> {
request_span_properties(request, provider, base_url)
}
- File: src/proxy/handlers/messages/mod.rs, lines 95-97
fn response_span_properties(response: &Self::Response, usage: &Usage) -> Vec<(String, String)> {
response_span_properties(response, usage)
}
- File: src/proxy/handlers/messages/mod.rs, lines 99-101
fn apply_chunk_span_properties(span: &Span, chunk: &Self::StreamChunk, _is_first_item: bool) {
span.add_properties(|| chunk_span_properties(chunk));
}
- File: src/proxy/handlers/responses/mod.rs, lines 69-71
fn span_name() -> &'static str {
"aisix.llm.responses"
}
- File: src/proxy/handlers/responses/mod.rs, lines 73-75
fn missing_model_error() -> Self::Error {
ResponsesError::MissingModelInContext
}
- File: src/proxy/handlers/responses/mod.rs, lines 77-79
fn set_model(request: &mut Self::Request, model: String) {
request.model = model;
}
- File: src/proxy/handlers/responses/mod.rs, lines 81-87
fn request_span_properties(
request: &Self::Request,
provider: &dyn ProviderCapabilities,
base_url: Option<&Url>,
) -> Vec<(String, String)> {
request_span_properties(request, provider, base_url)
}
- File: src/proxy/handlers/responses/mod.rs, lines 89-91
fn response_span_properties(response: &Self::Response, usage: &Usage) -> Vec<(String, String)> {
response_span_properties(response, usage)
}
- File: src/proxy/handlers/responses/mod.rs, lines 93-95
fn apply_chunk_span_properties(span: &Span, chunk: &Self::StreamChunk, _is_first_item: bool) {
span.add_properties(|| chunk_span_properties(chunk));
}
- File: src/proxy/handlers/responses/mod.rs, lines 97-99
fn starts_output(chunk: &Self::StreamChunk) -> bool {
event_starts_output(chunk)
}
- File: src/proxy/handlers/responses/mod.rs, lines 101-103
fn record_stream_item(collector: &mut Self::Collector, chunk: &Self::StreamChunk) {
collector.record_event(chunk);
}
- File: src/proxy/handlers/responses/mod.rs, lines 105-107
fn output_message_span_properties(collector: &Self::Collector) -> Vec<(String, String)> {
collector.output_message_span_properties()
}
- File: src/proxy/handlers/responses/mod.rs, lines 109-118
fn guardrail_input_payload(
lifecycle_state: &Self::LifecycleState,
_request: &Self::Request,
) -> Result<Option<crate::guardrail::traits::GuardrailCheckPayload>, Self::Error> {
let payload =
input_guardrail_payload_from_chat_messages(&lifecycle_state.merged_input_messages)
.map(crate::guardrail::traits::GuardrailCheckPayload::Input)
.map_err(bridge_error)?;
Ok(Some(payload))
}
- File: src/proxy/handlers/responses/mod.rs, lines 120-131
fn apply_input_guardrail_rewrite(
lifecycle_state: &mut Self::LifecycleState,
request: &mut Self::Request,
rewrite: crate::guardrail::traits::GuardrailCheckPayload,
) -> Result<(), Self::Error> {
let messages = input_payload_to_chat_messages(
&input_payload_from_check_payload(rewrite).map_err(bridge_error)?,
)
.map_err(bridge_error)?;
rewrite_response_from_messages(request, &messages)?;
Ok(())
}
- File: src/proxy/guardrails.rs, lines 424-431 (summarized): Guardrail stream output payload handling snippet (condensed for brevity).
Summary: guardrail_stream_output_payload(lifecycle_state, collector) -> Result<Option, Self::Error>; obtains payload from collector.output_messages() and returns Some(payload) or None.
Note: If you want, I can collapse any of the long sections into a single summarized entry with explicit behavior instead of listing every sub-snippet.
🧠 Learnings used
Learnt from: bzp2010
Repo: api7/aisix PR: 34
File: src/proxy/hooks2/authorization/mod.rs:1-1
Timestamp: 2026-04-10T09:35:05.745Z
Learning: In Rust code using the `anyhow` crate, treat `use anyhow::Result;` as potentially used even if the code uses non-default error types like `Result<T, SomeOtherError>` (i.e., `Result<T, E>` where `E` is not `anyhow::Error`). Because `anyhow::Result` is an alias for `std::result::Result<T, E=anyhow::Error>`, the import is referenced by the `Result<..., ...>` syntax. Do not flag `use anyhow::Result;` as unused solely due to the presence of `Result<T, SomeOtherError>` usages.
Learnt from: AlinsRan
Repo: api7/aisix PR: 46
File: src/lib.rs:122-126
Timestamp: 2026-04-17T09:32:54.079Z
Learning: Avoid adding `#[fastrace::trace]` to top-level process-lifetime entrypoints in `src/lib.rs`—specifically `run()`, `run_with_config()`, and `run_with_provider()`. These functions cover the entire server lifetime (startup → shutdown), while Fastrace spans should be used for request-scoped boundaries (e.g., HTTP/RPC handler functions) rather than global process entrypoints.
Learnt from: bzp2010
Repo: api7/aisix PR: 80
File: src/main.rs:9-9
Timestamp: 2026-05-03T12:45:26.883Z
Learning: In this repository (api7/aisix), `aisix::utils::instance::init()` is intentionally **not** guaranteed to be called by the library entrypoints (`src/lib.rs` via `run()`, `run_with_config()`, `run_with_provider()`). Treat it as the embedder/binary’s responsibility to call `instance::init()` before handling any requests. Also note that `instance_id()` and `run_id()` in `src/utils/instance/mod.rs` intentionally panic (via `.expect()`) when the `OnceLock`s are uninitialized—this is by design. During code review, do **not** flag missing `instance::init()` wiring inside the library startup path as a bug and do **not** suggest moving `instance::init()` into the library automatically.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 826-845: The code currently treats a None from
A::guardrail_stream_output_payload as Ok(()) and thereby implicitly allows
output when guardrails are enabled; change the match so that when
lifecycle_state is Some but guardrail_stream_output_payload returns None you
return an Err (a guardrail violation/error) instead of Ok(()) so
apply_stream_output_guardrails is not skipped. Specifically, in the
WholeResponseReplayFinalize::NeedsGuardrailCheck branch, replace the fallback
that maps None -> Ok(()) with an explicit Err describing a missing guardrail
payload (or map to the existing guardrail error type), ensuring that
A::guardrail_stream_output_payload returning Err or None both produce an Err
that prevents silent bypass; keep the use of lifecycle_state,
guardrail_output_collector, apply_stream_output_guardrails, and
configured_guardrails as the surrounding context.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e318f9eb-b9f9-481f-9c76-3ef1af967615
📒 Files selected for processing (12)
src/proxy/guardrails.rssrc/proxy/guardrails/streaming.rssrc/proxy/handlers/chat_completions/mod.rssrc/proxy/handlers/chat_completions/span_attributes/stream_output.rssrc/proxy/handlers/format_handler.rssrc/proxy/handlers/messages/mod.rssrc/proxy/handlers/messages/span_attributes/stream_output.rssrc/proxy/handlers/responses/mod.rssrc/proxy/handlers/responses/span_attributes/stream_output.rstests/proxy/guardrail/chat-completions.test.tstests/proxy/guardrail/messages.test.tstests/proxy/guardrail/responses.test.ts
| GuardrailOutcome::Rewrite(_) => { | ||
| return Err(GatewayError::Validation(format!( | ||
| "guardrail {} requested streaming output rewrite, which is not supported yet", | ||
| guardrail.name() | ||
| )) | ||
| .into()); | ||
| } |
There was a problem hiding this comment.
@coderabbitai Regarding this currently unsupported feature, please create an issue to track it; we plan to implement it in the future.
There was a problem hiding this comment.
🧩 Analysis chain
Sure! Let me create a tracking issue for this now.
🏁 Script executed:
#!/bin/bash
gh issue create \
--repo api7/aisix \
--title "feat(guardrail): support streaming output rewrite" \
--body "## Summary
When a guardrail returns a \`Rewrite\` outcome for a **streaming** response, the proxy currently rejects the request with an error:
> \`guardrail {name} requested streaming output rewrite, which is not supported yet\`
This is a placeholder until whole-response streaming rewrite is implemented.
## Context
- Introduced in PR `#108` (feat(guardrail): add whole replay for stream request), specifically in \`apply_stream_output_guardrails\` in \`src/proxy/handlers/format_handler.rs\`.
- The non-streaming (complete-response) path already supports rewrite via \`apply_output_guardrail_rewrite\`.
## Goal
Implement support for \`GuardrailOutcome::Rewrite\` in the streaming output guardrail path, so that guardrail-driven rewrites can be applied to buffered/replayed stream content.
## References
- PR: https://github.com/api7/aisix/pull/108
- Comment: https://github.com/api7/aisix/pull/108#discussion_r3258689929
/cc `@bzp2010`" \
--assignee bzp2010
Length of output: 41
@bzp2010, I've created a tracking issue for this: #110 — feat(guardrail): support streaming output rewrite. It's assigned to you and includes context linking back to this PR and comment.
🧬 Code Graph Analysis Results
[src/proxy/handlers/format_handler.rs] – apply_stream_output_guardrails — summary
- Description: Iterates over configured guardrails and, for each, if a payload is available and a guardrail outcome is returned, handles the outcome.
- Parameters:
- guardrails: &[Box]
- payload: Option
- Returns: Result<(), A::Error>
- Important behavior:
- If GuardrailOutcome::Rewrite(payload) is encountered, returns a GatewayError::Validation indicating streaming output rewrite is not supported yet.
- If GuardrailOutcome::Block { reason } is encountered, returns a GatewayError::Validation with the blocking reason.
- Allows GuardrailOutcome::Allow to pass, and otherwise continues processing.
- Notable detail: The error message includes the guardrail name.
[src/proxy/handlers/format_handler.rs] – handle_stream_response — summary
- Description: Handles streaming responses; coordinates streaming guardrail checks and finalization.
- Context: When upstream finishes, optional streaming guardrail payload is prepared via A::guardrail_stream_output_payload and then passed to apply_stream_output_guardrails.
- Behavior:
- If lifecycle_state.as_ref() is present, obtains a streaming payload and runs apply_stream_output_guardrails with it.
- Propagates any errors from guardrail processing.
- On success, continues to finalize the streaming response or emits appropriate SSE events.
- Notable detail: This path shows where the unsupported streaming-output rewrite would surface (through apply_stream_output_guardrails).
🧠 Learnings used
Learnt from: bzp2010
Repo: api7/aisix PR: 34
File: src/proxy/hooks2/authorization/mod.rs:1-1
Timestamp: 2026-04-10T09:35:05.745Z
Learning: In Rust code using the `anyhow` crate, treat `use anyhow::Result;` as potentially used even if the code uses non-default error types like `Result<T, SomeOtherError>` (i.e., `Result<T, E>` where `E` is not `anyhow::Error`). Because `anyhow::Result` is an alias for `std::result::Result<T, E=anyhow::Error>`, the import is referenced by the `Result<..., ...>` syntax. Do not flag `use anyhow::Result;` as unused solely due to the presence of `Result<T, SomeOtherError>` usages.
Learnt from: AlinsRan
Repo: api7/aisix PR: 46
File: src/lib.rs:122-126
Timestamp: 2026-04-17T09:32:54.079Z
Learning: Avoid adding `#[fastrace::trace]` to top-level process-lifetime entrypoints in `src/lib.rs`—specifically `run()`, `run_with_config()`, and `run_with_provider()`. These functions cover the entire server lifetime (startup → shutdown), while Fastrace spans should be used for request-scoped boundaries (e.g., HTTP/RPC handler functions) rather than global process entrypoints.
Learnt from: bzp2010
Repo: api7/aisix PR: 80
File: src/main.rs:9-9
Timestamp: 2026-05-03T12:45:26.883Z
Learning: In this repository (api7/aisix), `aisix::utils::instance::init()` is intentionally **not** guaranteed to be called by the library entrypoints (`src/lib.rs` via `run()`, `run_with_config()`, `run_with_provider()`). Treat it as the embedder/binary’s responsibility to call `instance::init()` before handling any requests. Also note that `instance_id()` and `run_id()` in `src/utils/instance/mod.rs` intentionally panic (via `.expect()`) when the `OnceLock`s are uninitialized—this is by design. During code review, do **not** flag missing `instance::init()` wiring inside the library startup path as a bug and do **not** suggest moving `instance::init()` into the library automatically.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/proxy/handlers/format_handler.rs (1)
127-132: ⚡ Quick winMake this hook explicit instead of defaulting to
Ok(None).This default lets a new adapter compile cleanly even though guarded streams will fail later via
require_stream_output_guardrail_payload. Requiring an explicit implementation here, or at least documenting thatNoneis only valid when no stream-output guardrail can run, would surface the contract break much earlier.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/proxy/handlers/format_handler.rs` around lines 127 - 132, The method guardrail_stream_output_payload currently returns Ok(None) by default which hides missing implementations and causes later failures in require_stream_output_guardrail_payload; remove the default body so implementors must provide an explicit implementation (i.e. make the trait method declaration abstract/without a default impl) or, if you cannot remove the default, replace Ok(None) with a clear error return (e.g. an Err variant signaling "unsupported/missing stream-output guardrail") so the contract is enforced at compile/runtime; update references to guardrail_stream_output_payload and require_stream_output_guardrail_payload accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 735-744: The first-token telemetry is recorded too early (on the
first upstream chunk that may be buffered) because the now_starts_output /
first_output_arrived check using A::starts_output(&chunk) is inside the upstream
chunk handling; move the first_output_arrived flag flip, the
hooks::observability::record_first_token_latency(&mut request_ctx).await call,
and the span.add_event(TraceEvent::new("first token arrived")...) into the
actual client emit paths (i.e., the functions/methods that perform the real
emit/send to the guarded streams or client) so the telemetry fires only when a
chunk is emitted to the caller; update any guarded stream emit helpers to
check/set first_output_arrived and call record_first_token_latency there and
remove the early recording in the upstream handler.
---
Nitpick comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 127-132: The method guardrail_stream_output_payload currently
returns Ok(None) by default which hides missing implementations and causes later
failures in require_stream_output_guardrail_payload; remove the default body so
implementors must provide an explicit implementation (i.e. make the trait method
declaration abstract/without a default impl) or, if you cannot remove the
default, replace Ok(None) with a clear error return (e.g. an Err variant
signaling "unsupported/missing stream-output guardrail") so the contract is
enforced at compile/runtime; update references to
guardrail_stream_output_payload and require_stream_output_guardrail_payload
accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: dc49158e-2e06-46e6-b0fd-5fde31391eba
📒 Files selected for processing (1)
src/proxy/handlers/format_handler.rs
Summary by CodeRabbit
New Features
Tests