Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: rework open-metrics API handler to better surface errors #2015

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions crates/agent/src/api/public/open_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::BTreeMap;
use std::fmt::Write;
use std::sync::Arc;

use crate::api::{public::ApiErrorExt, App, ControlClaims};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use chrono::{Datelike, TimeZone};
use futures::TryStreamExt;
use futures::StreamExt;
use ops::stats::DocsAndBytes;
use std::collections::BTreeMap;
use std::fmt::Write;
use std::sync::Arc;

#[axum::debug_handler]
pub async fn handle_get_metrics(
Expand All @@ -21,12 +21,12 @@ pub async fn handle_get_metrics(
{
Ok(false) => {
return anyhow::anyhow!("user is not authorized to {prefix}")
.with_status(axum::http::StatusCode::UNAUTHORIZED)
.with_status(StatusCode::UNAUTHORIZED)
.into_response()
}
Err(err) => {
return err
.with_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.with_status(StatusCode::INTERNAL_SERVER_ERROR)
.into_response()
}
Ok(true) => (),
Expand All @@ -47,30 +47,38 @@ pub async fn handle_get_metrics(
// and all samples appear contiguously, so we jump through some hoops
// to efficiently accumulate chunked buffers that we can yield in the
// correct order once we're done.
let stream = coroutines::try_coroutine(move |mut co| async move {
let mut buf = BufferParts::new();
let mut buf = BufferParts::new();

for metric in REGISTRY {
metric.declare(&mut buf);
}
for metric in REGISTRY {
metric.declare(&mut buf);
}

let mut stats = sqlx::query!(
r#"
let mut stats = sqlx::query!(
r#"
SELECT flow_document AS "stats: sqlx::types::Json<CatalogStats>"
FROM catalog_stats
WHERE starts_with(catalog_name, $1) AND right(catalog_name, 1) != '/'
AND grain = 'monthly'
AND ts = $2
"#,
prefix,
now_month,
)
.fetch(&pg_pool);

while let Some(stat) = stats.try_next().await? {
encode_metrics(&mut buf, scrape_at, stat.stats.0);
prefix,
now_month,
)
.fetch(&pg_pool);

loop {
match stats.next().await {
Some(Ok(stat)) => encode_metrics(&mut buf, scrape_at, stat.stats.0),
Some(Err(err)) => {
return err
.with_status(StatusCode::INTERNAL_SERVER_ERROR)
.into_response()
}
None => break,
}
}

let stream = coroutines::try_coroutine(move |mut co| async move {
for group in buf.groups {
for chunk in group {
() = co.yield_(chunk.freeze()).await;
Expand Down
Loading