diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 43ef588e192..064448e0270 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -16,7 +16,7 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, + GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, new_gauge, new_histogram_vec, }; @@ -30,19 +30,13 @@ pub struct StorageMetrics { pub searcher_split_cache: CacheMetrics, pub get_slice_timeout_successes: [IntCounter; 3], pub get_slice_timeout_all_timeouts: IntCounter, - pub object_storage_get_total: IntCounter, - pub object_storage_get_errors_total: IntCounterVec<1>, + pub object_storage_requests_total: IntCounterVec<2>, + pub object_storage_request_duration: HistogramVec<2>, pub object_storage_get_slice_in_flight_count: IntGauge, pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_put_total: IntCounter, - pub object_storage_put_parts: IntCounter, - pub object_storage_download_num_bytes: IntCounter, - pub object_storage_upload_num_bytes: IntCounter, - - pub object_storage_delete_requests_total: IntCounter, - pub object_storage_bulk_delete_requests_total: IntCounter, - pub object_storage_delete_request_duration: Histogram, - pub object_storage_bulk_delete_request_duration: Histogram, + pub object_storage_download_num_bytes: IntCounterVec<1>, + pub object_storage_download_errors: IntCounterVec<1>, + pub object_storage_upload_num_bytes: IntCounterVec<1>, } impl Default for StorageMetrics { @@ -63,31 +57,6 @@ impl Default for StorageMetrics { let get_slice_timeout_all_timeouts = get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); - let object_storage_requests_total = new_counter_vec( - "object_storage_requests_total", - "Total number of object storage requests performed.", - "storage", - &[], - ["action"], - ); - let object_storage_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_object"]); - let object_storage_bulk_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_objects"]); - - let object_storage_request_duration = new_histogram_vec( - "object_storage_request_duration_seconds", - "Duration of object storage requests in seconds.", - "storage", - &[], - ["action"], - vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], - ); - let object_storage_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_object"]); - let object_storage_bulk_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_objects"]); - StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -97,62 +66,63 @@ impl Default for StorageMetrics { split_footer_cache: CacheMetrics::for_component("splitfooter"), get_slice_timeout_successes, get_slice_timeout_all_timeouts, - object_storage_get_total: new_counter( - "object_storage_gets_total", - "Number of objects fetched. Might be lower than get_slice_timeout_outcome if \ - queries are debounced.", + object_storage_requests_total: new_counter_vec( + "object_storage_requests_total", + "Number of requests to the object store, by action and status. Requests are \ + recorded when the response headers are returned, download failures will not \ + appear as errors.", "storage", &[], + ["action", "status"], ), - object_storage_get_errors_total: new_counter_vec::<1>( - "object_storage_get_errors_total", - "Number of GetObject errors.", + object_storage_request_duration: new_histogram_vec( + "object_storage_request_duration", + "Durations until the response headers are returned from the object store, by \ + action and status. This does not measure the download time for the body content.", "storage", &[], - ["code"], + ["action", "status"], + vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], ), object_storage_get_slice_in_flight_count: new_gauge( "object_storage_get_slice_in_flight_count", - "Number of GetObject for which the memory was allocated but the download is still \ - in progress.", + "Number of get_object for which the memory was allocated but the download is \ + still in progress.", "storage", &[], ), object_storage_get_slice_in_flight_num_bytes: new_gauge( "object_storage_get_slice_in_flight_num_bytes", - "Memory allocated for GetObject requests that are still in progress.", + "Memory allocated for get_object requests that are still in progress.", "storage", &[], ), - object_storage_put_total: new_counter( - "object_storage_puts_total", - "Number of objects uploaded. May differ from object_storage_requests_parts due to \ - multipart upload.", + object_storage_download_num_bytes: new_counter_vec( + "object_storage_download_num_bytes", + "Amount of data downloaded from object storage.", "storage", &[], + ["status"], ), - object_storage_put_parts: new_counter( - "object_storage_puts_parts", - "Number of object parts uploaded.", - "", - &[], - ), - object_storage_download_num_bytes: new_counter( - "object_storage_download_num_bytes", - "Amount of data downloaded from an object storage.", + object_storage_download_errors: new_counter_vec( + "object_storage_download_errors", + // Download errors are recorded separately because the associated + // get_object requests were already recorded as successful in + // object_storage_requests_total + "Number of download requests that received successful response headers but failed \ + during download.", "storage", &[], + ["status"], ), - object_storage_upload_num_bytes: new_counter( + object_storage_upload_num_bytes: new_counter_vec( "object_storage_upload_num_bytes", - "Amount of data uploaded to an object storage.", + "Amount of data uploaded to object storage. The value recorded for failed and \ + aborted uploads is the full payload size.", "storage", &[], + ["status"], ), - object_storage_delete_requests_total, - object_storage_bulk_delete_requests_total, - object_storage_delete_request_duration, - object_storage_bulk_delete_request_duration, } } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index b21776fa69f..3bb6d9711dd 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,10 +45,13 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage, - StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, }; /// Azure object storage resolver. @@ -225,10 +228,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(payload.len()); retry(&self.retry_params, || async { let data = Bytes::from(payload.read_all().await?.to_vec()); let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0); @@ -237,6 +236,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() + .with_count_and_upload_metrics(ActionLabel::PutObject, payload.len()) .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -261,10 +261,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { let block_id = format!("block:{num}"); @@ -276,6 +272,10 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() + .with_count_and_upload_metrics( + ActionLabel::UploadPart, + range.end - range.start, + ) .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -299,6 +299,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await .map_err(AzureErrorWrapper::from)?; @@ -315,6 +316,7 @@ impl Storage for AzureBlobStorage { .max_results(NonZeroU32::new(1u32).expect("1 is always non-zero.")) .into_stream() .next() + .with_count_metric(ActionLabel::ListObjects) .await { let _ = first_blob_result?; @@ -327,7 +329,6 @@ impl Storage for AzureBlobStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let name = self.blob_name(path); let total_len = payload.len(); let part_num_bytes = self.multipart_policy.part_num_bytes(total_len); @@ -345,7 +346,11 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().await { + while let Some(chunk_result) = output_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -353,10 +358,7 @@ impl Storage for AzureBlobStorage { .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } output.flush().await?; Ok(()) @@ -369,6 +371,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() + .with_count_metric(ActionLabel::DeleteObject) .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -491,6 +494,7 @@ impl Storage for AzureBlobStorage { .blob_client(name) .get_properties() .into_future() + .with_count_metric(ActionLabel::HeadObject) .await; match properties_result { Ok(response) => Ok(response.blob.properties.content_length), @@ -513,7 +517,7 @@ async fn extract_range_data_and_hash( .await? .into_async_read(); let mut buf: Vec = Vec::with_capacity(range.count()); - tokio::io::copy(&mut reader, &mut buf).await?; + tokio::io::copy_buf(&mut reader, &mut buf).await?; let data = Bytes::from(buf); let hash = md5::compute(&data[..]); Ok((data, hash)) @@ -544,7 +548,11 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().await { + while let Some(chunk_result) = chunk_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data @@ -552,10 +560,7 @@ async fn download_all( .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - crate::STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); diff --git a/quickwit/quickwit-storage/src/object_storage/error.rs b/quickwit/quickwit-storage/src/object_storage/error.rs index 5f60fe1f944..8a7efc13332 100644 --- a/quickwit/quickwit-storage/src/object_storage/error.rs +++ b/quickwit/quickwit-storage/src/object_storage/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::error::{DisplayErrorContext, SdkError}; use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError; use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError; use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError; @@ -62,11 +62,6 @@ pub trait ToStorageErrorKind { impl ToStorageErrorKind for GetObjectError { fn to_storage_error_kind(&self) -> StorageErrorKind { - let error_code = self.code().unwrap_or("unknown"); - crate::STORAGE_METRICS - .object_storage_get_errors_total - .with_label_values([error_code]) - .inc(); match self { GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service, GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound, diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs new file mode 100644 index 00000000000..f2d92991984 --- /dev/null +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -0,0 +1,426 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; +use std::time::Instant; + +use pin_project::{pin_project, pinned_drop}; +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use crate::STORAGE_METRICS; + +pub enum ActionLabel { + AbortMultipartUpload, + CompleteMultipartUpload, + CreateMultipartUpload, + DeleteObject, + DeleteObjects, + GetObject, + HeadObject, + ListObjects, + PutObject, + UploadPart, +} + +impl ActionLabel { + fn as_str(&self) -> &'static str { + match self { + ActionLabel::AbortMultipartUpload => "abort_multipart_upload", + ActionLabel::CompleteMultipartUpload => "complete_multipart_upload", + ActionLabel::CreateMultipartUpload => "create_multipart_upload", + ActionLabel::DeleteObject => "delete_object", + ActionLabel::DeleteObjects => "delete_objects", + ActionLabel::GetObject => "get_object", + ActionLabel::HeadObject => "head_object", + ActionLabel::ListObjects => "list_objects", + ActionLabel::PutObject => "put_object", + ActionLabel::UploadPart => "upload_part", + } + } +} + +pub enum RequestStatus { + Pending, + // only useful on feature="azure" + #[allow(dead_code)] + Done, + Ready(String), +} + +/// Converts an object store client SDK Result<> to the [Status] that should be +/// recorded in the metrics. +/// +/// The `Marker` type is necessary to avoid conflicting implementations of the +/// trait. +pub trait AsRequestStatus { + fn as_status(&self) -> RequestStatus; +} + +/// Wrapper around object store requests to record metrics, including cancellation. +#[pin_project(PinnedDrop)] +pub struct RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + #[pin] + tracked: F, + action: ActionLabel, + start: Option, + uploaded_bytes: Option, + status: RequestStatus, + _marker: PhantomData, +} + +#[pinned_drop] +impl PinnedDrop for RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.status { + RequestStatus::Pending => "cancelled", + RequestStatus::Done => return, + RequestStatus::Ready(s) => s.as_str(), + }; + let label_values = [self.action.as_str(), status]; + STORAGE_METRICS + .object_storage_requests_total + .with_label_values(label_values) + .inc(); + if let Some(start) = self.start { + STORAGE_METRICS + .object_storage_request_duration + .with_label_values(label_values) + .observe(start.elapsed().as_secs_f64()); + } + if let Some(bytes) = self.uploaded_bytes { + STORAGE_METRICS + .object_storage_upload_num_bytes + .with_label_values([status]) + .inc_by(bytes); + } + } +} + +impl Future for RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = response.as_status(); + + Poll::Ready(response) + } +} + +pub trait RequestMetricsWrapperExt +where + F: Future, + F::Output: AsRequestStatus, +{ + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper; + + fn with_count_and_duration_metrics( + self, + action: ActionLabel, + ) -> RequestMetricsWrapper; + + fn with_count_and_upload_metrics( + self, + action: ActionLabel, + bytes: u64, + ) -> RequestMetricsWrapper; +} + +impl RequestMetricsWrapperExt for F +where + F: Future, + F::Output: AsRequestStatus, +{ + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: None, + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_duration_metrics( + self, + action: ActionLabel, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: Some(Instant::now()), + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_upload_metrics( + self, + action: ActionLabel, + bytes: u64, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: None, + uploaded_bytes: Some(bytes), + _marker: PhantomData, + } + } +} + +pub struct S3Marker; + +impl AsRequestStatus for Result +where E: aws_sdk_s3::error::ProvideErrorMetadata +{ + fn as_status(&self) -> RequestStatus { + let status_str = match self { + Ok(_) => "success".to_string(), + Err(e) => e.meta().code().unwrap_or("unknown").to_string(), + }; + RequestStatus::Ready(status_str) + } +} + +#[cfg(feature = "azure")] +pub struct AzureMarker; + +#[cfg(feature = "azure")] +impl AsRequestStatus for Result { + fn as_status(&self) -> RequestStatus { + let Err(err) = self else { + return RequestStatus::Ready("success".to_string()); + }; + let err_status_str = match err.kind() { + azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), + azure_storage::ErrorKind::Credential => "credential".to_string(), + azure_storage::ErrorKind::Io => "io".to_string(), + azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), + _ => "unknown".to_string(), + }; + RequestStatus::Ready(err_status_str) + } +} + +// The Azure SDK get_blob request returns Option because it chunks +// the download into a stream of get requests. +#[cfg(feature = "azure")] +impl AsRequestStatus for Option> { + fn as_status(&self) -> RequestStatus { + match self { + None => RequestStatus::Done, + Some(res) => res.as_status(), + } + } +} + +pub enum DownloadStatus { + InProgress, + Done, + Failed(&'static str), +} + +/// Track io errors during downloads. +/// +/// Downloads are a bit different from other requests because the request might +/// fail while getting the bytes from the response body, long after getting a +/// successful response header. +#[pin_project(PinnedDrop)] +struct DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + #[pin] + tracked: copy_buf::CopyBuf<'a, R, W>, + status: DownloadStatus, +} + +#[pinned_drop] +impl<'a, R, W> PinnedDrop for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + fn drop(self: Pin<&mut Self>) { + let error_opt = match &self.status { + DownloadStatus::InProgress => Some("cancelled"), + DownloadStatus::Failed(e) => Some(*e), + DownloadStatus::Done => None, + }; + + STORAGE_METRICS + .object_storage_download_num_bytes + .with_label_values([error_opt.unwrap_or("success")]) + .inc_by(self.tracked.amt); + + if let Some(error) = error_opt { + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([error]) + .inc(); + } + } +} + +impl<'a, R, W> Future for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = match &response { + Ok(_) => DownloadStatus::Done, + Err(e) => DownloadStatus::Failed(io_error_as_label(e.kind())), + }; + Poll::Ready(response) + } +} + +pub async fn copy_with_download_metrics<'a, R, W>( + reader: &'a mut R, + writer: &'a mut W, +) -> io::Result +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + DownloadMetricsWrapper { + tracked: copy_buf::CopyBuf { + reader, + writer, + amt: 0, + }, + status: DownloadStatus::InProgress, + } + .await +} + +/// This is a fork of `tokio::io::copy_buf` that enables tracking the number of +/// bytes transferred. This estimate should be accurate as long as the network +/// is the bottleneck. +mod copy_buf { + + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll, ready}; + + use tokio::io::{AsyncBufRead, AsyncWrite}; + + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + pub reader: &'a mut R, + pub writer: &'a mut W, + pub amt: u64, + } + + impl Future for CopyBuf<'_, R, W> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } + } +} + +fn io_error_as_label(error: io::ErrorKind) -> &'static str { + use io::ErrorKind::*; + // most of these variants are not expected to happen + match error { + AddrInUse => "addr_in_use", + AddrNotAvailable => "addr_not_available", + AlreadyExists => "already_exists", + ArgumentListTooLong => "argument_list_too_long", + BrokenPipe => "broken_pipe", + ConnectionAborted => "connection_aborted", + ConnectionRefused => "connection_refused", + ConnectionReset => "connection_reset", + CrossesDevices => "crosses_devices", + Deadlock => "deadlock", + DirectoryNotEmpty => "directory_not_empty", + ExecutableFileBusy => "executable_file_busy", + FileTooLarge => "file_too_large", + HostUnreachable => "host_unreachable", + Interrupted => "interrupted", + InvalidData => "invalid_data", + InvalidFilename => "invalid_filename", + InvalidInput => "invalid_input", + IsADirectory => "is_a_directory", + NetworkDown => "network_down", + NetworkUnreachable => "network_unreachable", + NotADirectory => "not_a_directory", + NotConnected => "not_connected", + NotFound => "not_found", + NotSeekable => "not_seekable", + Other => "other", + OutOfMemory => "out_of_memory", + PermissionDenied => "permission_denied", + QuotaExceeded => "quota_exceeded", + ReadOnlyFilesystem => "read_only_filesystem", + ResourceBusy => "resource_busy", + StaleNetworkFileHandle => "stale_network_file_handle", + StorageFull => "storage_full", + TimedOut => "timed_out", + TooManyLinks => "too_many_links", + UnexpectedEof => "unexpected_eof", + Unsupported => "unsupported", + WouldBlock => "would_block", + WriteZero => "write_zero", + _ => "uncategorized", + } +} diff --git a/quickwit/quickwit-storage/src/object_storage/mod.rs b/quickwit/quickwit-storage/src/object_storage/mod.rs index e914c107291..cee3bacd338 100644 --- a/quickwit/quickwit-storage/src/object_storage/mod.rs +++ b/quickwit/quickwit-storage/src/object_storage/mod.rs @@ -14,6 +14,8 @@ mod error; +mod metrics_wrappers; + mod s3_compatible_storage; pub use self::s3_compatible_storage::S3CompatibleObjectStorage; pub use self::s3_compatible_storage_resolver::S3CompatibleObjectStorageFactory; diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 6a7105fb8f1..9d6d376205e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -46,10 +46,13 @@ use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::object_storage::MultiPartPolicy; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, OwnedBytes, STORAGE_METRICS, Storage, StorageError, - StorageErrorKind, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, + StorageResolverError, StorageResult, }; /// Semaphore to limit the number of concurrent requests to the object store. Some object stores @@ -286,11 +289,6 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(len); - self.s3_client .put_object() .bucket(bucket) @@ -298,6 +296,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() + .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -332,6 +331,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() + .with_count_metric(ActionLabel::CreateMultipartUpload) .await }) .await? @@ -421,11 +421,6 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(part.len()); - let upload_part_output = self .s3_client .upload_part() @@ -437,6 +432,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() + .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -516,6 +512,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await }) .await?; @@ -530,6 +527,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() + .with_count_metric(ActionLabel::AbortMultipartUpload) .await }) .await?; @@ -544,8 +542,6 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - crate::STORAGE_METRICS.object_storage_get_total.inc(); - let get_object_output = self .s3_client .get_object() @@ -553,6 +549,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() + .with_count_metric(ActionLabel::GetObject) .await?; Ok(get_object_output) } @@ -640,17 +637,12 @@ impl S3CompatibleObjectStorage { for (path_chunk, delete) in &mut delete_requests_it { let delete_objects_res: StorageResult = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_bulk_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_bulk_delete_request_duration - .start_timer(); self.s3_client .delete_objects() .bucket(self.bucket.clone()) .delete(delete.clone()) .send() + .with_count_and_duration_metrics(ActionLabel::DeleteObjects) .await }) .await @@ -716,10 +708,7 @@ impl S3CompatibleObjectStorage { async fn download_all(byte_stream: ByteStream, output: &mut Vec) -> io::Result<()> { output.clear(); let mut body_stream_reader = BufReader::new(byte_stream.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); Ok(()) @@ -735,6 +724,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() + .with_count_metric(ActionLabel::ListObjects) .await?; Ok(()) } @@ -744,7 +734,6 @@ impl Storage for S3CompatibleObjectStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let _permit = REQUEST_SEMAPHORE.acquire().await; let key = self.key(path); let total_len = payload.len(); @@ -763,10 +752,7 @@ impl Storage for S3CompatibleObjectStorage { let get_object_output = aws_retry(&self.retry_params, || self.get_object(path, None)).await?; let mut body_read = BufReader::new(get_object_output.body.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_read, output).await?; output.flush().await?; Ok(()) } @@ -776,17 +762,12 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let delete_res = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_delete_request_duration - .start_timer(); self.s3_client .delete_object() .bucket(&bucket) .key(&key) .send() + .with_count_and_duration_metrics(ActionLabel::DeleteObject) .await }) .await; @@ -867,6 +848,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() + .with_count_metric(ActionLabel::HeadObject) .await }) .await?;