-
Notifications
You must be signed in to change notification settings - Fork 475
Reorganize object store metrics #5821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
68c7373
to
8bdb431
Compare
8bdb431
to
88528bf
Compare
quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs
Outdated
Show resolved
Hide resolved
quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Outdated
Show resolved
Hide resolved
0d1674d
to
93ce4eb
Compare
ActionLabel::DeleteObject => "delete_object", | ||
ActionLabel::DeleteObjects => "delete_objects", | ||
ActionLabel::GetObject => "get_object", | ||
ActionLabel::HeadObject => "head_object", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Azure: get_properties
S3: head_object
Opendal: stat
- Use explicit label values - Track download at the copy level Unified label values for object store actions.
93ce4eb
to
cb7eb48
Compare
/// This is a fork of `tokio::io::copy_buf` that enables tracking the number of | ||
/// bytes transferred. This estimate should be accurate as long as the network | ||
/// is the bottleneck. | ||
mod copy_buf { | ||
|
||
use std::future::Future; | ||
use std::io; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll, ready}; | ||
|
||
use tokio::io::{AsyncBufRead, AsyncWrite}; | ||
|
||
#[derive(Debug)] | ||
#[must_use = "futures do nothing unless you `.await` or poll them"] | ||
pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { | ||
pub reader: &'a mut R, | ||
pub writer: &'a mut W, | ||
pub amt: u64, | ||
} | ||
|
||
impl<R, W> Future for CopyBuf<'_, R, W> | ||
where | ||
R: AsyncBufRead + Unpin + ?Sized, | ||
W: AsyncWrite + Unpin + ?Sized, | ||
{ | ||
type Output = io::Result<u64>; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
loop { | ||
let me = &mut *self; | ||
let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; | ||
if buffer.is_empty() { | ||
ready!(Pin::new(&mut self.writer).poll_flush(cx))?; | ||
return Poll::Ready(Ok(self.amt)); | ||
} | ||
|
||
let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; | ||
if i == 0 { | ||
return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); | ||
} | ||
self.amt += i as u64; | ||
Pin::new(&mut *self.reader).consume(i); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was actually pretty simple to have an estimate for the number of bytes already downloaded when the error occurred, so I added it.
Description
Closes #5799
Rationalize object store metrics. Be more exhaustive in request and error recording.
Important: This is a breaking change as some metrics are renamed.
How was this PR tested?
TODO: add tests to the metrics wrappers