Skip to content

feat(guardrail): add whole replay for streaming chat#108

Merged
bzp2010 merged 3 commits into
mainfrom
bzp/feat-guradrail-stream-whole-replay
May 18, 2026
Merged

feat(guardrail): add whole replay for streaming chat#108
bzp2010 merged 3 commits into
mainfrom
bzp/feat-guradrail-stream-whole-replay

Conversation

@bzp2010
Copy link
Copy Markdown
Collaborator

@bzp2010 bzp2010 commented May 18, 2026

Summary by CodeRabbit

  • New Features

    • Streamed-response guardrails: streamed outputs are validated; disallowed streams emit SSE error events, approved streams replay original upstream chunks to clients.
    • Streamed output is now exposed as OpenAI-style chat messages for span/usage reporting; format handlers can provide a guardrail payload for streamed output.
  • Tests

    • Added end-to-end tests covering guardrail blocking and replay behavior for chat completions, messages, and responses (SSE and Anthropic formats).

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5911e809-6d02-4db4-80dc-2f473402e3ed

📥 Commits

Reviewing files that changed from the base of the PR and between c93173a and 6b744f3.

📒 Files selected for processing (1)
  • src/proxy/handlers/format_handler.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/proxy/handlers/format_handler.rs

📝 Walkthrough

Walkthrough

Buffers upstream SSE chunks until a whole-response guardrail check runs on the collected payload; on approval the buffered chunks are replayed, on rejection an SSE error is emitted. Adds collector output extraction, adapter hooks, driver types, SSE wiring, and tests for blocked/approved streamed flows.

Changes

Streaming Output Guardrails with Replay

Layer / File(s) Summary
Streaming Replay Infrastructure
src/proxy/guardrails.rs, src/proxy/guardrails/streaming.rs
Introduces StreamCheckpoint, StreamGuardrailDecision, WholeResponseReplay<Chunk>, and WholeResponseReplayDriver<Chunk> to manage buffering state and replay queues. The driver conditionally buffers or passes through upstream chunks, signals guardrail check on completion when buffered, approves/drains chunks to queue, and exposes status helpers. Unit tests verify pass-through and buffered replay modes.
Format Handler Streaming Integration
src/proxy/handlers/format_handler.rs
Extends FormatHandlerAdapter with guardrail_stream_output_payload hook (default Ok(None)), tightens StreamChunk to require Clone, and refactors handle_stream_response to initialize a replay driver (when output guardrails exist), thread it through SSE unfold state, integrate replay-driven chunk emission/buffering in the streaming loop, and evaluate the complete buffered payload at upstream completion. New helpers apply streaming-output guardrails, enforce payload presence, compute failure-only span properties, and record first-token latency. Unit tests cover helper behavior.
Adapter Implementations and Tests
src/proxy/handlers/chat_completions/mod.rs, src/proxy/handlers/chat_completions/span_attributes/stream_output.rs, src/proxy/handlers/messages/mod.rs, src/proxy/handlers/messages/span_attributes/stream_output.rs, src/proxy/handlers/responses/mod.rs, src/proxy/handlers/responses/span_attributes/stream_output.rs, tests/proxy/guardrail/*
ChatCompletionsAdapter, MessagesAdapter, and ResponsesAdapter implement guardrail_stream_output_payload (building payload from collector.output_messages()), add SSE OpenAI/Anthropic error event helpers, add output_messages() on stream collectors, and include Jest tests that parse SSE output and assert both blocked and approved streamed output flows across endpoints.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • api7/aisix#92: Related streaming handler refactor that this PR builds upon.
  • api7/aisix#104: Closely related guardrail infrastructure changes extended by this PR.
  • api7/aisix#93: Prior SSE control-flow changes in the format handler that intersect with this PR's streaming integration.
🚥 Pre-merge checks | ✅ 6
✅ Passed checks (6 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(guardrail): add whole replay for streaming chat' directly and clearly describes the main change: adding a whole-response replay buffering mechanism for streaming chat guardrails across multiple format handlers.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
E2e Test Quality Review ✅ Passed E2E tests are comprehensive with real servers and proper error assertions. Code has no panics/unwraps, proper error propagation, thread-safe design.
Security Check ✅ Passed Security review complete. No critical/high vulnerabilities found. Error messages are generic, no hardcoded secrets, no TLS issues, no auth bypasses, no unencrypted storage of credentials.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bzp/feat-guradrail-stream-whole-replay

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +412 to +418
GuardrailOutcome::Block { reason } => {
return Err(GatewayError::Validation(format!(
"guardrail {} blocked output: {}",
guardrail.name(),
reason
))
.into());
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai Error handling should support both a strict rejection mode that returns a 400 Bad Request status code and a more lenient mode that returns a simulated "Let's change the subject" response.

This will be implemented in a future pull request; please create an issue to track it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

Sure, I'll create a tracking issue for that now.


🏁 Script executed:

gh issue create \
  --repo api7/aisix \
  --title "Streaming output guardrails: support strict (400) and lenient (topic-change) rejection modes" \
  --body "## Summary

Currently, \`apply_stream_output_guardrails\` in \`src/proxy/handlers/format_handler.rs\` only supports a single hard-rejection path that returns a \`GatewayError::Validation\` error. Error handling should be extended to support two configurable modes:

1. **Strict mode** – return a \`400 Bad Request\` status code when the guardrail blocks streaming output.
2. **Lenient mode** – instead of an error, return a simulated \"Let's change the subject\" (topic-deflection) response, giving a softer guardrail experience.

## Background

Raised during review of PR `#108` (comment: https://github.com/api7/aisix/pull/108#discussion_r3258681325) as a follow-up to the streaming output guardrail implementation.

## Requested by

`@bzp2010`
" \
  --assignee bzp2010

Length of output: 41


@bzp2010, I've created a tracking issue for this: #109"Streaming output guardrails: support strict (400) and lenient (topic-change) rejection modes", assigned to you. It captures both the strict (400 Bad Request) and lenient (topic-deflection) modes, along with a backlink to this PR and comment.


🧬 Code Graph Analysis Results
  • File: src/utils/future.rs, lines 10-19
pub async fn maybe_timeout<F, T>(dur: Option<Duration>, fut: F) -> Result<T, Elapsed>
where
    F: Future<Output = T>,
{
    match dur {
        Some(d) if d.is_zero() => Ok(fut.await),
        Some(d) => tokio::time::timeout(d, fut).await,
        None => Ok(fut.await),
    }
}
  • File: src/proxy/guardrails.rs, lines 50-53
async fn check(
        &self,
        payload: &GuardrailCheckPayload,
    ) -> Result<Option<GuardrailOutcome>, GatewayError>;
  • File: src/proxy/guardrails.rs, lines 82-101
async fn check(
        &self,
        payload: &GuardrailCheckPayload,
    ) -> Result<Option<GuardrailOutcome>, GatewayError> {
        if !self.supports_stage(payload.stage()) {
            return Ok(None);
        }

        self.runtime
            .check(payload, &self.config)
            .await
            .map(Some)
            .map_err(|error| {
                GatewayError::Internal(format!(
                    "guardrail {} failed: {}",
                    self.runtime.name(),
                    error
                ))
            })
    }
  • File: src/proxy/guardrails.rs, lines 424-431
async fn check(
            &self,
            payload: &GuardrailCheckPayload,
            _config: &(),
        ) -> Result<GuardrailOutcome, Self::Error> {
            self.seen_payloads.lock().unwrap().push(payload.clone());
            Ok(self.outcome.clone())
        }
  • File: src/proxy/handlers/messages/mod.rs, lines 75-77
fn span_name() -> &'static str {
        "aisix.llm.messages"
    }
  • File: src/proxy/handlers/messages/mod.rs, lines 79-81
fn missing_model_error() -> Self::Error {
        MessagesError::MissingModelInContext
    }
  • File: src/proxy/handlers/messages/mod.rs, lines 83-85
fn set_model(request: &mut Self::Request, model: String) {
        request.model = model;
    }
  • File: src/proxy/handlers/messages/mod.rs, lines 87-93
fn request_span_properties(
        request: &Self::Request,
        provider: &dyn ProviderCapabilities,
        base_url: Option<&Url>,
    ) -> Vec<(String, String)> {
        request_span_properties(request, provider, base_url)
    }
  • File: src/proxy/handlers/messages/mod.rs, lines 95-97
fn response_span_properties(response: &Self::Response, usage: &Usage) -> Vec<(String, String)> {
        response_span_properties(response, usage)
    }
  • File: src/proxy/handlers/messages/mod.rs, lines 99-101
fn apply_chunk_span_properties(span: &Span, chunk: &Self::StreamChunk, _is_first_item: bool) {
        span.add_properties(|| chunk_span_properties(chunk));
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 69-71
fn span_name() -> &'static str {
        "aisix.llm.responses"
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 73-75
fn missing_model_error() -> Self::Error {
        ResponsesError::MissingModelInContext
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 77-79
fn set_model(request: &mut Self::Request, model: String) {
        request.model = model;
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 81-87
fn request_span_properties(
        request: &Self::Request,
        provider: &dyn ProviderCapabilities,
        base_url: Option<&Url>,
    ) -> Vec<(String, String)> {
        request_span_properties(request, provider, base_url)
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 89-91
fn response_span_properties(response: &Self::Response, usage: &Usage) -> Vec<(String, String)> {
        response_span_properties(response, usage)
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 93-95
fn apply_chunk_span_properties(span: &Span, chunk: &Self::StreamChunk, _is_first_item: bool) {
        span.add_properties(|| chunk_span_properties(chunk));
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 97-99
fn starts_output(chunk: &Self::StreamChunk) -> bool {
        event_starts_output(chunk)
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 101-103
fn record_stream_item(collector: &mut Self::Collector, chunk: &Self::StreamChunk) {
        collector.record_event(chunk);
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 105-107
fn output_message_span_properties(collector: &Self::Collector) -> Vec<(String, String)> {
        collector.output_message_span_properties()
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 109-118
fn guardrail_input_payload(
        lifecycle_state: &Self::LifecycleState,
        _request: &Self::Request,
    ) -> Result<Option<crate::guardrail::traits::GuardrailCheckPayload>, Self::Error> {
        let payload =
            input_guardrail_payload_from_chat_messages(&lifecycle_state.merged_input_messages)
                .map(crate::guardrail::traits::GuardrailCheckPayload::Input)
                .map_err(bridge_error)?;
        Ok(Some(payload))
    }
  • File: src/proxy/handlers/responses/mod.rs, lines 120-131
fn apply_input_guardrail_rewrite(
        lifecycle_state: &mut Self::LifecycleState,
        request: &mut Self::Request,
        rewrite: crate::guardrail::traits::GuardrailCheckPayload,
    ) -> Result<(), Self::Error> {
        let messages = input_payload_to_chat_messages(
            &input_payload_from_check_payload(rewrite).map_err(bridge_error)?,
        )
        .map_err(bridge_error)?;
        rewrite_response_from_messages(request, &messages)?;
        Ok(())
    }
  • File: src/proxy/guardrails.rs, lines 424-431 (summarized): Guardrail stream output payload handling snippet (condensed for brevity).
    Summary: guardrail_stream_output_payload(lifecycle_state, collector) -> Result<Option, Self::Error>; obtains payload from collector.output_messages() and returns Some(payload) or None.

Note: If you want, I can collapse any of the long sections into a single summarized entry with explicit behavior instead of listing every sub-snippet.


🧠 Learnings used
Learnt from: bzp2010
Repo: api7/aisix PR: 34
File: src/proxy/hooks2/authorization/mod.rs:1-1
Timestamp: 2026-04-10T09:35:05.745Z
Learning: In Rust code using the `anyhow` crate, treat `use anyhow::Result;` as potentially used even if the code uses non-default error types like `Result<T, SomeOtherError>` (i.e., `Result<T, E>` where `E` is not `anyhow::Error`). Because `anyhow::Result` is an alias for `std::result::Result<T, E=anyhow::Error>`, the import is referenced by the `Result<..., ...>` syntax. Do not flag `use anyhow::Result;` as unused solely due to the presence of `Result<T, SomeOtherError>` usages.

Learnt from: AlinsRan
Repo: api7/aisix PR: 46
File: src/lib.rs:122-126
Timestamp: 2026-04-17T09:32:54.079Z
Learning: Avoid adding `#[fastrace::trace]` to top-level process-lifetime entrypoints in `src/lib.rs`—specifically `run()`, `run_with_config()`, and `run_with_provider()`. These functions cover the entire server lifetime (startup → shutdown), while Fastrace spans should be used for request-scoped boundaries (e.g., HTTP/RPC handler functions) rather than global process entrypoints.

Learnt from: bzp2010
Repo: api7/aisix PR: 80
File: src/main.rs:9-9
Timestamp: 2026-05-03T12:45:26.883Z
Learning: In this repository (api7/aisix), `aisix::utils::instance::init()` is intentionally **not** guaranteed to be called by the library entrypoints (`src/lib.rs` via `run()`, `run_with_config()`, `run_with_provider()`). Treat it as the embedder/binary’s responsibility to call `instance::init()` before handling any requests. Also note that `instance_id()` and `run_id()` in `src/utils/instance/mod.rs` intentionally panic (via `.expect()`) when the `OnceLock`s are uninitialized—this is by design. During code review, do **not** flag missing `instance::init()` wiring inside the library startup path as a bug and do **not** suggest moving `instance::init()` into the library automatically.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 826-845: The code currently treats a None from
A::guardrail_stream_output_payload as Ok(()) and thereby implicitly allows
output when guardrails are enabled; change the match so that when
lifecycle_state is Some but guardrail_stream_output_payload returns None you
return an Err (a guardrail violation/error) instead of Ok(()) so
apply_stream_output_guardrails is not skipped. Specifically, in the
WholeResponseReplayFinalize::NeedsGuardrailCheck branch, replace the fallback
that maps None -> Ok(()) with an explicit Err describing a missing guardrail
payload (or map to the existing guardrail error type), ensuring that
A::guardrail_stream_output_payload returning Err or None both produce an Err
that prevents silent bypass; keep the use of lifecycle_state,
guardrail_output_collector, apply_stream_output_guardrails, and
configured_guardrails as the surrounding context.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e318f9eb-b9f9-481f-9c76-3ef1af967615

📥 Commits

Reviewing files that changed from the base of the PR and between dfb3bdb and f3d62c8.

📒 Files selected for processing (12)
  • src/proxy/guardrails.rs
  • src/proxy/guardrails/streaming.rs
  • src/proxy/handlers/chat_completions/mod.rs
  • src/proxy/handlers/chat_completions/span_attributes/stream_output.rs
  • src/proxy/handlers/format_handler.rs
  • src/proxy/handlers/messages/mod.rs
  • src/proxy/handlers/messages/span_attributes/stream_output.rs
  • src/proxy/handlers/responses/mod.rs
  • src/proxy/handlers/responses/span_attributes/stream_output.rs
  • tests/proxy/guardrail/chat-completions.test.ts
  • tests/proxy/guardrail/messages.test.ts
  • tests/proxy/guardrail/responses.test.ts

Comment thread src/proxy/handlers/format_handler.rs
Comment on lines +405 to +411
GuardrailOutcome::Rewrite(_) => {
return Err(GatewayError::Validation(format!(
"guardrail {} requested streaming output rewrite, which is not supported yet",
guardrail.name()
))
.into());
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai Regarding this currently unsupported feature, please create an issue to track it; we plan to implement it in the future.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

Sure! Let me create a tracking issue for this now.


🏁 Script executed:

#!/bin/bash
gh issue create \
  --repo api7/aisix \
  --title "feat(guardrail): support streaming output rewrite" \
  --body "## Summary

When a guardrail returns a \`Rewrite\` outcome for a **streaming** response, the proxy currently rejects the request with an error:

> \`guardrail {name} requested streaming output rewrite, which is not supported yet\`

This is a placeholder until whole-response streaming rewrite is implemented.

## Context

- Introduced in PR `#108` (feat(guardrail): add whole replay for stream request), specifically in \`apply_stream_output_guardrails\` in \`src/proxy/handlers/format_handler.rs\`.
- The non-streaming (complete-response) path already supports rewrite via \`apply_output_guardrail_rewrite\`.

## Goal

Implement support for \`GuardrailOutcome::Rewrite\` in the streaming output guardrail path, so that guardrail-driven rewrites can be applied to buffered/replayed stream content.

## References

- PR: https://github.com/api7/aisix/pull/108
- Comment: https://github.com/api7/aisix/pull/108#discussion_r3258689929

/cc `@bzp2010`" \
  --assignee bzp2010

Length of output: 41


@bzp2010, I've created a tracking issue for this: #110feat(guardrail): support streaming output rewrite. It's assigned to you and includes context linking back to this PR and comment.


🧬 Code Graph Analysis Results

[src/proxy/handlers/format_handler.rs] – apply_stream_output_guardrails — summary

  • Description: Iterates over configured guardrails and, for each, if a payload is available and a guardrail outcome is returned, handles the outcome.
  • Parameters:
    • guardrails: &[Box]
    • payload: Option
  • Returns: Result<(), A::Error>
  • Important behavior:
    • If GuardrailOutcome::Rewrite(payload) is encountered, returns a GatewayError::Validation indicating streaming output rewrite is not supported yet.
    • If GuardrailOutcome::Block { reason } is encountered, returns a GatewayError::Validation with the blocking reason.
    • Allows GuardrailOutcome::Allow to pass, and otherwise continues processing.
  • Notable detail: The error message includes the guardrail name.

[src/proxy/handlers/format_handler.rs] – handle_stream_response — summary

  • Description: Handles streaming responses; coordinates streaming guardrail checks and finalization.
  • Context: When upstream finishes, optional streaming guardrail payload is prepared via A::guardrail_stream_output_payload and then passed to apply_stream_output_guardrails.
  • Behavior:
    • If lifecycle_state.as_ref() is present, obtains a streaming payload and runs apply_stream_output_guardrails with it.
    • Propagates any errors from guardrail processing.
    • On success, continues to finalize the streaming response or emits appropriate SSE events.
  • Notable detail: This path shows where the unsupported streaming-output rewrite would surface (through apply_stream_output_guardrails).

🧠 Learnings used
Learnt from: bzp2010
Repo: api7/aisix PR: 34
File: src/proxy/hooks2/authorization/mod.rs:1-1
Timestamp: 2026-04-10T09:35:05.745Z
Learning: In Rust code using the `anyhow` crate, treat `use anyhow::Result;` as potentially used even if the code uses non-default error types like `Result<T, SomeOtherError>` (i.e., `Result<T, E>` where `E` is not `anyhow::Error`). Because `anyhow::Result` is an alias for `std::result::Result<T, E=anyhow::Error>`, the import is referenced by the `Result<..., ...>` syntax. Do not flag `use anyhow::Result;` as unused solely due to the presence of `Result<T, SomeOtherError>` usages.

Learnt from: AlinsRan
Repo: api7/aisix PR: 46
File: src/lib.rs:122-126
Timestamp: 2026-04-17T09:32:54.079Z
Learning: Avoid adding `#[fastrace::trace]` to top-level process-lifetime entrypoints in `src/lib.rs`—specifically `run()`, `run_with_config()`, and `run_with_provider()`. These functions cover the entire server lifetime (startup → shutdown), while Fastrace spans should be used for request-scoped boundaries (e.g., HTTP/RPC handler functions) rather than global process entrypoints.

Learnt from: bzp2010
Repo: api7/aisix PR: 80
File: src/main.rs:9-9
Timestamp: 2026-05-03T12:45:26.883Z
Learning: In this repository (api7/aisix), `aisix::utils::instance::init()` is intentionally **not** guaranteed to be called by the library entrypoints (`src/lib.rs` via `run()`, `run_with_config()`, `run_with_provider()`). Treat it as the embedder/binary’s responsibility to call `instance::init()` before handling any requests. Also note that `instance_id()` and `run_id()` in `src/utils/instance/mod.rs` intentionally panic (via `.expect()`) when the `OnceLock`s are uninitialized—this is by design. During code review, do **not** flag missing `instance::init()` wiring inside the library startup path as a bug and do **not** suggest moving `instance::init()` into the library automatically.

@bzp2010 bzp2010 changed the title feat(guardrail): add whole replay for stream request feat(guardrail): add whole replay for streaming chat May 18, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/proxy/handlers/format_handler.rs (1)

127-132: ⚡ Quick win

Make this hook explicit instead of defaulting to Ok(None).

This default lets a new adapter compile cleanly even though guarded streams will fail later via require_stream_output_guardrail_payload. Requiring an explicit implementation here, or at least documenting that None is only valid when no stream-output guardrail can run, would surface the contract break much earlier.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/format_handler.rs` around lines 127 - 132, The method
guardrail_stream_output_payload currently returns Ok(None) by default which
hides missing implementations and causes later failures in
require_stream_output_guardrail_payload; remove the default body so implementors
must provide an explicit implementation (i.e. make the trait method declaration
abstract/without a default impl) or, if you cannot remove the default, replace
Ok(None) with a clear error return (e.g. an Err variant signaling
"unsupported/missing stream-output guardrail") so the contract is enforced at
compile/runtime; update references to guardrail_stream_output_payload and
require_stream_output_guardrail_payload accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 735-744: The first-token telemetry is recorded too early (on the
first upstream chunk that may be buffered) because the now_starts_output /
first_output_arrived check using A::starts_output(&chunk) is inside the upstream
chunk handling; move the first_output_arrived flag flip, the
hooks::observability::record_first_token_latency(&mut request_ctx).await call,
and the span.add_event(TraceEvent::new("first token arrived")...) into the
actual client emit paths (i.e., the functions/methods that perform the real
emit/send to the guarded streams or client) so the telemetry fires only when a
chunk is emitted to the caller; update any guarded stream emit helpers to
check/set first_output_arrived and call record_first_token_latency there and
remove the early recording in the upstream handler.

---

Nitpick comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 127-132: The method guardrail_stream_output_payload currently
returns Ok(None) by default which hides missing implementations and causes later
failures in require_stream_output_guardrail_payload; remove the default body so
implementors must provide an explicit implementation (i.e. make the trait method
declaration abstract/without a default impl) or, if you cannot remove the
default, replace Ok(None) with a clear error return (e.g. an Err variant
signaling "unsupported/missing stream-output guardrail") so the contract is
enforced at compile/runtime; update references to
guardrail_stream_output_payload and require_stream_output_guardrail_payload
accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dc49158e-2e06-46e6-b0fd-5fde31391eba

📥 Commits

Reviewing files that changed from the base of the PR and between f3d62c8 and c93173a.

📒 Files selected for processing (1)
  • src/proxy/handlers/format_handler.rs

Comment thread src/proxy/handlers/format_handler.rs Outdated
@bzp2010 bzp2010 merged commit d7fc71f into main May 18, 2026
3 checks passed
@bzp2010 bzp2010 deleted the bzp/feat-guradrail-stream-whole-replay branch May 18, 2026 12:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant