diff --git a/.gitignore b/.gitignore index 59019604e1..d6e75923ad 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ vcpkg_installed/ # Editor user customizations. .vscode/launch.json +.vscode/mcp.json .idea/ # Editor temp files. diff --git a/sdk/core/azure_core/src/http/pager.rs b/sdk/core/azure_core/src/http/pager.rs index 7b85a6244c..1a62dd1073 100644 --- a/sdk/core/azure_core/src/http/pager.rs +++ b/sdk/core/azure_core/src/http/pager.rs @@ -3,7 +3,11 @@ //! Types and methods for pageable responses. -use crate::http::{headers::HeaderName, response::Response, DeserializeWith, Format, JsonFormat}; +use crate::http::{ + headers::HeaderName, policies::create_public_api_span, response::Response, Context, + DeserializeWith, Format, JsonFormat, +}; +use crate::tracing::Span; use async_trait::async_trait; use futures::{stream::unfold, FutureExt, Stream}; use std::{ @@ -205,6 +209,13 @@ type BoxedStream

= Box> + Send>; #[cfg(target_arch = "wasm32")] type BoxedStream

= Box>>; +/// Options for configuring a [`Pager`]'s behavior. +#[derive(Clone, Debug, Default)] +pub struct PagerOptions<'a> { + /// Context for HTTP requests made by the pager. + pub context: Context<'a>, +} + /// Iterates over a collection of items or individual pages of items from a service. /// /// You can asynchronously iterate over items returned by a collection request to a service, @@ -302,7 +313,7 @@ impl ItemIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = ItemIterator::from_callback(move |next_link: PagerState| { + /// let pager = ItemIterator::from_callback(move |next_link: PagerState, ctx: Context| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let api_version = api_version.clone(); @@ -321,7 +332,7 @@ impl ItemIterator

{ /// .append_pair("api-version", &api_version); /// } /// let resp = pipeline - /// .send(&Context::new(), &mut req, None) + /// .send(&ctx, &mut req, None) /// .await?; /// let (status, headers, body) = resp.deconstruct(); /// let result: ListItemsResult = json::from_json(&body)?; @@ -334,7 +345,7 @@ impl ItemIterator

{ /// None => PagerResult::Done { response: resp } /// }) /// } - /// }); + /// }, None); /// ``` /// /// To page results using headers: @@ -357,7 +368,7 @@ impl ItemIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = ItemIterator::from_callback(move |continuation| { + /// let pager = ItemIterator::from_callback(move |continuation, ctx| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let mut req = base_req.clone(); @@ -366,25 +377,36 @@ impl ItemIterator

{ /// req.insert_header("x-ms-continuation", continuation); /// } /// let resp: Response = pipeline - /// .send(&Context::new(), &mut req, None) + /// .send(&ctx, &mut req, None) /// .await? /// .into(); /// Ok(PagerResult::from_response_header(resp, &HeaderName::from_static("x-next-continuation"))) /// } - /// }); + /// },None); /// ``` pub fn from_callback< + 'a, // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. #[cfg(not(target_arch = "wasm32"))] C: AsRef + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState) -> Fut + Send + 'static, + #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, Context<'a>) -> Fut + Send + 'static, #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, #[cfg(target_arch = "wasm32")] C: AsRef + 'static, - #[cfg(target_arch = "wasm32")] F: Fn(PagerState) -> Fut + 'static, + #[cfg(target_arch = "wasm32")] F: Fn(PagerState, Context<'a>) -> Fut + 'static, #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, >( make_request: F, - ) -> Self { - Self::from_stream(iter_from_callback(make_request, || None, |_| {})) + options: Option>, + ) -> Self + where + 'a: 'static, + { + let options = options.unwrap_or_default(); + Self::from_stream(iter_from_callback( + make_request, + options.context.clone(), + || None, + |_| {}, + )) } /// Creates a [`ItemIterator

`] from a raw stream of [`Result

`](crate::Result

) values. @@ -523,7 +545,7 @@ impl

PageIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = PageIterator::from_callback(move |next_link: PagerState| { + /// let pager = PageIterator::from_callback(move |next_link: PagerState, ctx| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let api_version = api_version.clone(); @@ -542,7 +564,7 @@ impl

PageIterator

{ /// .append_pair("api-version", &api_version); /// } /// let resp = pipeline - /// .send(&Context::new(), &mut req, None) + /// .send(&ctx, &mut req, None) /// .await?; /// let (status, headers, body) = resp.deconstruct(); /// let result: ListItemsResult = json::from_json(&body)?; @@ -555,7 +577,7 @@ impl

PageIterator

{ /// None => PagerResult::Done { response: resp } /// }) /// } - /// }); + /// }, None); /// ``` /// /// To page results using headers: @@ -569,7 +591,7 @@ impl

PageIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = PageIterator::from_callback(move |continuation| { + /// let pager = PageIterator::from_callback(move |continuation, ctx| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let mut req = base_req.clone(); @@ -578,33 +600,38 @@ impl

PageIterator

{ /// req.insert_header("x-ms-continuation", continuation); /// } /// let resp: Response = pipeline - /// .send(&Context::new(), &mut req, None) + /// .send(&ctx, &mut req, None) /// .await? /// .into(); /// Ok(PagerResult::from_response_header(resp, &HeaderName::from_static("x-ms-continuation"))) /// } - /// }); + /// }, None); /// ``` pub fn from_callback< + 'a, // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. #[cfg(not(target_arch = "wasm32"))] C: AsRef + FromStr + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState) -> Fut + Send + 'static, + #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, Context<'a>) -> Fut + Send + 'static, #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, #[cfg(target_arch = "wasm32")] C: AsRef + FromStr + 'static, - #[cfg(target_arch = "wasm32")] F: Fn(PagerState) -> Fut + 'static, + #[cfg(target_arch = "wasm32")] F: Fn(PagerState, Context<'a>) -> Fut + 'static, #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, >( make_request: F, + options: Option>, ) -> Self where ::Err: fmt::Debug, + 'a: 'static, { + let options = options.unwrap_or_default(); let continuation_token = Arc::new(Mutex::new(None::)); let get_clone = continuation_token.clone(); let set_clone = continuation_token.clone(); let stream = iter_from_callback( make_request, + options.context.clone(), move || { if let Ok(token_guard) = get_clone.lock() { return token_guard @@ -723,56 +750,118 @@ enum State { Done, } +struct PagerStreamState<'a, C, F, G, S> { + state: State, + make_request: F, + get_next: G, + set_next: S, + ctx: Context<'a>, + added_span: bool, +} + fn iter_from_callback< + 'a, P, // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. #[cfg(not(target_arch = "wasm32"))] C: AsRef + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState) -> Fut + Send + 'static, + #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, Context<'a>) -> Fut + Send + 'static, #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, #[cfg(not(target_arch = "wasm32"))] G: Fn() -> Option + Send + 'static, #[cfg(not(target_arch = "wasm32"))] S: Fn(Option<&str>) + Send + 'static, #[cfg(target_arch = "wasm32")] C: AsRef + 'static, - #[cfg(target_arch = "wasm32")] F: Fn(PagerState) -> Fut + 'static, + #[cfg(target_arch = "wasm32")] F: Fn(PagerState, Context<'a>) -> Fut + 'static, #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, #[cfg(target_arch = "wasm32")] G: Fn() -> Option + 'static, #[cfg(target_arch = "wasm32")] S: Fn(Option<&str>) + 'static, >( make_request: F, + ctx: Context<'a>, get_next: G, set_next: S, -) -> impl Stream> + 'static { +) -> impl Stream> + 'static +where + 'a: 'static, +{ + // Keep `ctx` alive within the stream's state so callers can safely capture it + // (e.g., for tracing) without forcing non-'static borrows that trigger lifetime errors. unfold( - // We flow the `make_request` callback, 'get_next', and `set_next` through the state value so that we can avoid cloning. - (State::Init, make_request, get_next, set_next), - |(mut state, make_request, get_next, set_next)| async move { - if let Some(next_token) = get_next() { - state = State::More(next_token); + // Flow `make_request`, `get_next`, `set_next`, and `ctx` through the state to avoid cloning. + PagerStreamState:: { + state: State::Init, + make_request, + get_next, + set_next, + ctx, + added_span: false, + }, + |mut stream_state| async move { + if let Some(next_token) = (stream_state.get_next)() { + stream_state.state = State::More(next_token); } - let result = match state { - State::Init => make_request(PagerState::Initial).await, - State::More(n) => make_request(PagerState::More(n)).await, + let result = match stream_state.state { + State::Init => { + // At the very start of polling, create a span for the entire request, and attach it to the context + let span = create_public_api_span(&stream_state.ctx, None, None); + if let Some(ref s) = span { + stream_state.added_span = true; + stream_state.ctx = stream_state.ctx.with_value(s.clone()); + } + (stream_state.make_request)(PagerState::Initial, stream_state.ctx.clone()).await + } + State::More(n) => { + (stream_state.make_request)(PagerState::More(n), stream_state.ctx.clone()).await + } State::Done => { - set_next(None); + (stream_state.set_next)(None); return None; } }; let (item, next_state) = match result { - Err(e) => return Some((Err(e), (State::Done, make_request, get_next, set_next))), + Err(e) => { + stream_state.state = State::Done; + return Some((Err(e), stream_state)); + } Ok(PagerResult::More { response, continuation: next_token, }) => { - set_next(Some(next_token.as_ref())); + (stream_state.set_next)(Some(next_token.as_ref())); (Ok(response), State::More(next_token)) } Ok(PagerResult::Done { response }) => { - set_next(None); + // When the result is done, finalize the span. Note that we only do that if we created the span in the first place, + // otherwise it is the responsibility of the caller to end their span. + if stream_state.added_span { + if let Some(span) = stream_state.ctx.value::>() { + // P is unconstrained, so it's not possible to retrieve the status code for now. + + // // 5xx status codes SHOULD set status to Error. + // // The description should not be set because it can be inferred from "http.response.status_code". + // if response.status().is_server_error() { + // span.set_status(SpanStatus::Error { + // description: "".to_string(), + // }); + // } + // if response.status().is_client_error() + // || response.status().is_server_error() + // { + // span.set_attribute( + // "error.type", + // response.status().to_string().into(), + // ); + // } + + span.end(); + } + } + (stream_state.set_next)(None); (Ok(response), State::Done) } }; - // Flow 'make_request', 'get_next', and 'set_next' through to avoid cloning - Some((item, (next_state, make_request, get_next, set_next))) + // Propagate state (including `ctx`) without cloning. + stream_state.state = next_state; + Some((item, stream_state)) }, ) } @@ -781,8 +870,8 @@ fn iter_from_callback< mod tests { use crate::http::{ headers::{HeaderName, HeaderValue}, - pager::{PageIterator, Pager, PagerResult, PagerState}, - RawResponse, Response, StatusCode, + pager::{PageIterator, Pager, PagerOptions, PagerResult, PagerState}, + Context, RawResponse, Response, StatusCode, }; use async_trait::async_trait; use futures::{StreamExt as _, TryStreamExt as _}; @@ -808,81 +897,89 @@ mod tests { #[tokio::test] async fn callback_item_pagination() { - let pager: Pager = Pager::from_callback(|continuation| async move { - match continuation { - PagerState::Initial => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-1"), - )]) + let context = Context::new(); + let pager: Pager = Pager::from_callback( + |continuation, _| async move { + match continuation { + PagerState::Initial => Ok(PagerResult::More { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-1"), + )]) + .into(), + r#"{"items":[1],"page":1}"#, + ) .into(), - r#"{"items":[1],"page":1}"#, - ) - .into(), - continuation: "1", - }), - PagerState::More("1") => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-2"), - )]) + continuation: "1", + }), + PagerState::More("1") => Ok(PagerResult::More { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-2"), + )]) + .into(), + r#"{"items":[2],"page":2}"#, + ) .into(), - r#"{"items":[2],"page":2}"#, - ) - .into(), - continuation: "2", - }), - PagerState::More("2") => Ok(PagerResult::Done { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-3"), - )]) + continuation: "2", + }), + PagerState::More("2") => Ok(PagerResult::Done { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-3"), + )]) + .into(), + r#"{"items":[3],"page":3}"#, + ) .into(), - r#"{"items":[3],"page":3}"#, - ) - .into(), - }), - _ => { - panic!("Unexpected continuation value") + }), + _ => { + panic!("Unexpected continuation value") + } } - } - }); + }, + Some(PagerOptions { context }), + ); let items: Vec = pager.try_collect().await.unwrap(); assert_eq!(vec![1, 2, 3], items.as_slice()) } #[tokio::test] async fn callback_item_pagination_error() { - let pager: Pager = Pager::from_callback(|continuation| async move { - match continuation { - PagerState::Initial => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-1"), - )]) + let context = Context::new(); + let pager: Pager = Pager::from_callback( + |continuation, _| async move { + match continuation { + PagerState::Initial => Ok(PagerResult::More { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-1"), + )]) + .into(), + r#"{"items":[1],"page":1}"#, + ) .into(), - r#"{"items":[1],"page":1}"#, - ) - .into(), - continuation: "1", - }), - PagerState::More("1") => Err(typespec::Error::with_message( - typespec::error::ErrorKind::Other, - "yon request didst fail", - )), - _ => { - panic!("Unexpected continuation value") + continuation: "1", + }), + PagerState::More("1") => Err(typespec::Error::with_message( + typespec::error::ErrorKind::Other, + "yon request didst fail", + )), + _ => { + panic!("Unexpected continuation value") + } } - } - }); + }, + Some(PagerOptions { context }), + ); let pages: Vec> = pager .into_pages() .then(|r| async move { @@ -916,7 +1013,7 @@ mod tests { #[tokio::test] async fn page_iterator_with_continuation_token() { let make_callback = || { - |continuation: PagerState| async move { + |continuation: PagerState, _| async move { match continuation.as_deref() { PagerState::Initial => Ok(PagerResult::More { response: RawResponse::from_bytes( @@ -959,9 +1056,15 @@ mod tests { } }; + let ctx = Context::new(); + // Create the first PageIterator. - let mut first_pager: PageIterator> = - PageIterator::from_callback(make_callback()); + let mut first_pager: PageIterator> = PageIterator::from_callback( + make_callback(), + Some(PagerOptions { + context: ctx.clone(), + }), + ); // Should start with no continuation_token. assert_eq!(first_pager.continuation_token(), None); @@ -984,8 +1087,9 @@ mod tests { assert_eq!(continuation_token, "next-token-1"); // Create the second PageIterator. + let context = Context::new(); let mut second_pager: PageIterator> = - PageIterator::from_callback(make_callback()) + PageIterator::from_callback(make_callback(), Some(PagerOptions { context })) .with_continuation_token(continuation_token); // Should start with link to second page. diff --git a/sdk/core/azure_core/src/http/poller.rs b/sdk/core/azure_core/src/http/poller.rs index 4e733c2a39..2e38cc8e65 100644 --- a/sdk/core/azure_core/src/http/poller.rs +++ b/sdk/core/azure_core/src/http/poller.rs @@ -7,12 +7,14 @@ use crate::{ error::{ErrorKind, ErrorResponse}, http::{ headers::{HeaderName, Headers}, - Format, JsonFormat, Response, StatusCode, + policies::create_public_api_span, + Context, Format, JsonFormat, Response, StatusCode, }, sleep, time::{Duration, OffsetDateTime}, + tracing::{Span, SpanStatus}, }; -use futures::{stream::unfold, Stream, StreamExt}; +use futures::{channel::oneshot, stream::unfold, Stream, StreamExt}; use serde::Deserialize; use std::{ convert::Infallible, @@ -20,14 +22,18 @@ use std::{ future::{Future, IntoFuture}, pin::Pin, str::FromStr, - task::{Context, Poll}, + sync::Arc, + task::{Context as TaskContext, Poll}, }; +use typespec_client_core::http::ClientMethodOptions; /// Default retry time for long-running operations if no retry-after header is present /// /// This value is the same as the default used in other Azure SDKs e.g., /// const DEFAULT_RETRY_TIME: Duration = Duration::seconds(30); + +/// Minimum retry time for long-running operations const MIN_RETRY_TIME: Duration = Duration::seconds(1); /// Represents the state of a [`Poller`]. @@ -150,13 +156,13 @@ pub struct PollerOptions { /// The time to wait between polling intervals in absence of a `retry-after` header. /// /// The default is 30 seconds. The minimum time enforced by [`Poller::from_callback`] is 1 second. - pub frequency: Option, + pub frequency: Duration, } impl Default for PollerOptions { fn default() -> Self { Self { - frequency: Some(DEFAULT_RETRY_TIME), + frequency: DEFAULT_RETRY_TIME, } } } @@ -174,7 +180,7 @@ pub enum PollerResult { /// The HTTP response with the status monitor. response: Response, /// The optional client-specified [`Duration`] to wait before polling again. - retry_after: Option, + retry_after: Duration, /// The next link / continuation token. next: N, }, @@ -342,7 +348,7 @@ where target: Option>, } -impl Poller +impl<'a, M, F> Poller where M: StatusMonitor, F: Format + Send, @@ -389,7 +395,7 @@ where /// let url = "https://example.com/my_operation".parse().unwrap(); /// let mut req = Request::new(url, Method::Post); /// - /// let poller = Poller::from_callback(move |operation_url: PollerState| { + /// let poller = Poller::from_callback(move |operation_url: PollerState, ctx, options| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let api_version = api_version.clone(); @@ -406,7 +412,7 @@ where /// .append_pair("api-version", &api_version); /// /// let resp = pipeline - /// .send(&Context::new(), &mut req, None) + /// .send(&ctx, &mut req, None) /// .await?; /// let (status, headers, body) = resp.deconstruct(); /// let result: OperationResult = json::from_json(&body)?; @@ -419,7 +425,7 @@ where /// let operation_url = format!("https://example.com/operations/{}", result.id).parse()?; /// Ok(PollerResult::InProgress { /// response: resp, - /// retry_after: None, + /// retry_after: options.frequency, /// next: operation_url /// }) /// } @@ -441,25 +447,29 @@ where /// _ => Ok(PollerResult::Done { response: resp }) /// } /// } - /// }, None); + /// }, None, None); /// ``` pub fn from_callback< #[cfg(not(target_arch = "wasm32"))] N: Send + 'static, - #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState) -> Fut + Send + 'static, + #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + Send + 'static, #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, #[cfg(target_arch = "wasm32")] N: 'static, - #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState) -> Fut + 'static, + #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + 'static, #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, >( make_request: Fun, + method_options: Option>, options: Option, ) -> Self where + 'a: 'static, M: Send + 'static, M::Output: Send + 'static, M::Format: Send + 'static, { - let (stream, target) = create_poller_stream(make_request, options); + let method_options = method_options.unwrap_or_default(); + let options = options.unwrap_or_default(); + let (stream, target) = create_poller_stream(make_request, method_options, options); Self { stream: Box::pin(stream), target: Some(target), @@ -493,7 +503,7 @@ where { type Item = crate::Result>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { let state = self.project().stream.poll_next(cx); if let Poll::Ready(Some(Ok(ref response))) = state { check_status_code(response)?; @@ -579,50 +589,102 @@ enum State { Done, } +/// The type of the oneshot channel sender for the target future. +type TargetTransmitterType<'a, M> = (Pin>, Option>); + +/// Represents the state used for each iteration through the poller stream. +struct PollerStreamState<'a, M, N, Fun> +where + M: StatusMonitor, +{ + /// The current polling state (Init, InProgress, or Done) + state: State, + /// The callback function to make requests + make_request: Fun, + /// Optional channel sender for the target future + target_tx: Option>>, + /// The context for the operation + ctx: Context<'a>, + /// The poller options + options: PollerOptions, + /// Whether a span was added to the context + added_span: bool, +} + fn create_poller_stream< + 'a, M, F: Format, #[cfg(not(target_arch = "wasm32"))] N: Send + 'static, - #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState) -> Fut + Send + 'static, + #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + Send + 'static, #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, #[cfg(target_arch = "wasm32")] N: 'static, - #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState) -> Fut + 'static, + #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + 'static, #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, >( make_request: Fun, - options: Option, + method_options: ClientMethodOptions<'a>, + options: PollerOptions, ) -> ( impl Stream>> + 'static, BoxedFuture, ) where + 'a: 'static, M: StatusMonitor + 'static, M::Output: Send + 'static, M::Format: Send + 'static, { - use futures::channel::oneshot; - + let ctx = method_options.context.clone(); let (target_tx, target_rx) = oneshot::channel(); - let frequency = options - .unwrap_or_default() - .frequency - .unwrap_or(DEFAULT_RETRY_TIME); + assert!( - frequency >= MIN_RETRY_TIME, + options.frequency >= MIN_RETRY_TIME, "minimum polling frequency is 1 second" ); - let stream = unfold( // We flow the `make_request` callback through the state value to avoid cloning. - (State::Init, make_request, Some(target_tx)), - move |(state, make_request, target_tx)| async move { - let result = match state { - State::Init => make_request(PollerState::Initial).await, - State::InProgress(n) => make_request(PollerState::More(n)).await, - State::Done => return None, + PollerStreamState:: { + state: State::Init, + make_request, + target_tx: Some(target_tx), + ctx, + options, + added_span: false, + }, + move |mut poller_stream_state| async move { + let result = match poller_stream_state.state { + State::Init => { + // At the very start of polling, create a span for the entire request, and attach it to the context + let span = create_public_api_span(&poller_stream_state.ctx, None, None); + if let Some(ref s) = span { + poller_stream_state.added_span = true; + poller_stream_state.ctx = poller_stream_state.ctx.with_value(s.clone()); + } + (poller_stream_state.make_request)( + PollerState::Initial, + poller_stream_state.ctx.clone(), + poller_stream_state.options, + ) + .await + } + State::InProgress(n) => { + (poller_stream_state.make_request)( + PollerState::More(n), + poller_stream_state.ctx.clone(), + poller_stream_state.options, + ) + .await + } + State::Done => { + return None; + } }; let (item, next_state) = match result { - Err(e) => return Some((Err(e), (State::Done, make_request, target_tx))), + Err(e) => { + poller_stream_state.state = State::Done; + return Some((Err(e), poller_stream_state)); + } Ok(PollerResult::InProgress { response, retry_after, @@ -630,35 +692,86 @@ where }) => { // Note that test-proxy automatically adds a transform that zeroes an existing `after-retry` header during playback, so don't check at runtime: // - let duration = retry_after.unwrap_or(frequency); - - tracing::trace!("retry poller in {}s", duration.whole_seconds()); - sleep(duration).await; + tracing::trace!("retry poller in {}s", retry_after.whole_seconds()); + sleep(retry_after).await; (Ok(response), State::InProgress(n)) } + // Note that we will normally never reach this state. The normal progression of the `make_request` callback is to return `Succeeded` with a target future, + // and then the stream yields the final response and transitions to `Done` state. + // The only time that the `make_request` callback will normally enter the `Done` state directly is if the LRO fails or is canceled. Ok(PollerResult::Done { response }) => (Ok(response), State::Done), Ok(PollerResult::Succeeded { response, target: get_target, }) => { // Send the target callback through the channel - if let Some(tx) = target_tx { - let _ = tx.send(get_target()); + if let Some(tx) = poller_stream_state.target_tx.take() { + let _ = tx.send(( + get_target(), + if poller_stream_state.added_span { + Some(poller_stream_state.ctx.clone()) + } else { + None + }, + )); } - // Also yield the final status response and set target_tx to None since we consumed it - return Some((Ok(response), (State::Done, make_request, None))); + // Also yield the final status response + poller_stream_state.state = State::Done; + return Some((Ok(response), poller_stream_state)); } }; - // Flow 'make_request' and target_tx through to avoid cloning - Some((item, (next_state, make_request, target_tx))) + // Update state and return + poller_stream_state.state = next_state; + Some((item, poller_stream_state)) }, ); let target = Box::new(async move { match target_rx.await { - Ok(fut) => fut.await, + Ok(target_state) => { + // Await the target future to get the final response from the poller. + let res = target_state.0.await; + // If we added a span to the target, take the result of the final target future to finalize the span. + if let Some(ctx) = target_state.1 { + match &res { + Ok(response) => { + // When the result is done, finalize the span. Note that we only do that if we created the span in the first place, + // otherwise it is the responsibility of the caller to end their span. + if let Some(span) = ctx.value::>() { + // 5xx status codes SHOULD set status to Error. + // The description should not be set because it can be inferred from "http.response.status_code". + if response.status().is_server_error() { + span.set_status(SpanStatus::Error { + description: "".to_string(), + }); + } + if response.status().is_client_error() + || response.status().is_server_error() + { + span.set_attribute( + "error.type", + response.status().to_string().into(), + ); + } + + span.end(); + } + } + Err(err) => { + if let Some(span) = ctx.value::>() { + span.set_status(SpanStatus::Error { + description: err.to_string(), + }); + span.set_attribute("error.type", err.kind().to_string().into()); + span.end(); + } + } + } + } + res + } Err(err) => Err(crate::Error::with_error( ErrorKind::Other, err, @@ -675,11 +788,11 @@ pub fn get_retry_after( headers: &Headers, retry_headers: &[HeaderName], options: &PollerOptions, -) -> Option { +) -> Duration { #[cfg_attr(feature = "test", allow(unused_mut))] let duration = crate::http::policies::get_retry_after(headers, OffsetDateTime::now_utc, retry_headers) - .or(options.frequency); + .unwrap_or(options.frequency); #[cfg(feature = "test")] { @@ -689,17 +802,14 @@ pub fn get_retry_after( // we need to override the frequency for services which do not send back supported headers in their response. if matches!(headers.get_optional::(), Ok(Some(mode)) if mode == RecordingMode::Playback) { - match duration { - Some(duration) if duration > Duration::ZERO => { - tracing::debug!( - "overriding {}s poller retry in playback", - duration.whole_seconds() - ); - } - _ => {} + if duration > Duration::ZERO { + tracing::debug!( + "overriding {}s poller retry in playback", + duration.whole_seconds() + ); } - return Some(Duration::ZERO); + return Duration::ZERO; } } @@ -814,7 +924,7 @@ mod tests { }; let mut poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -829,7 +939,7 @@ mod tests { match test_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), _ => Ok(PollerResult::Done { response }), @@ -837,6 +947,7 @@ mod tests { } }, None, + None, ); // First poll should succeed (201 Created with InProgress) @@ -894,9 +1005,8 @@ mod tests { .boxed() })) }; - let mut poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -914,7 +1024,7 @@ mod tests { match test_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), _ => Ok(PollerResult::Done { response }), @@ -922,6 +1032,7 @@ mod tests { } }, None, + None, ); // First poll should succeed (201 Created with InProgress) @@ -981,7 +1092,7 @@ mod tests { }; let mut poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1000,7 +1111,7 @@ mod tests { match test_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), _ => Ok(PollerResult::Done { response }), @@ -1014,6 +1125,7 @@ mod tests { } }, None, + None, ); // First poll should succeed (200 OK with InProgress) @@ -1071,7 +1183,7 @@ mod tests { }; let poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1086,7 +1198,7 @@ mod tests { match test_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), PollerStatus::Succeeded => { @@ -1113,6 +1225,7 @@ mod tests { } }, None, + None, ); // Use IntoFuture to await completion @@ -1173,7 +1286,7 @@ mod tests { }; let poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new( @@ -1191,7 +1304,7 @@ mod tests { match operation_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), PollerStatus::Succeeded => { @@ -1233,6 +1346,7 @@ mod tests { } }, None, + None, ); // Use IntoFuture to await completion @@ -1295,7 +1409,7 @@ mod tests { }; let poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1310,7 +1424,7 @@ mod tests { match no_body_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), PollerStatus::Succeeded => { @@ -1333,6 +1447,7 @@ mod tests { } }, None, + None, ); // Use IntoFuture to await completion @@ -1381,7 +1496,7 @@ mod tests { }; let mut poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1396,7 +1511,7 @@ mod tests { match test_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), _ => Ok(PollerResult::Done { response }), @@ -1404,6 +1519,7 @@ mod tests { } }, None, + None, ); // First poll should succeed (201 Created with InProgress) @@ -1467,7 +1583,7 @@ mod tests { }; let poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1482,7 +1598,7 @@ mod tests { match test_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), PollerStatus::Succeeded => { @@ -1507,6 +1623,7 @@ mod tests { } }, None, + None, ); // Use IntoFuture to await completion @@ -1575,7 +1692,7 @@ mod tests { }; let poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1590,7 +1707,7 @@ mod tests { match self_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), PollerStatus::Succeeded => { @@ -1618,6 +1735,7 @@ mod tests { } }, None, + None, ); // Use IntoFuture to await completion @@ -1687,7 +1805,7 @@ mod tests { }; let mut poller = Poller::from_callback( - move |_| { + move |_, _, _| { let client = mock_client.clone(); async move { let req = Request::new("https://example.com".parse().unwrap(), Method::Get); @@ -1702,7 +1820,7 @@ mod tests { match self_status.status() { PollerStatus::InProgress => Ok(PollerResult::InProgress { response, - retry_after: Some(Duration::ZERO), + retry_after: Duration::ZERO, next: (), }), PollerStatus::Succeeded => { @@ -1729,6 +1847,7 @@ mod tests { } }, None, + None, ); // Use as a stream to monitor progress diff --git a/sdk/core/azure_core_test/src/tracing.rs b/sdk/core/azure_core_test/src/tracing.rs index c2e7f4dc6d..dc4296144f 100644 --- a/sdk/core/azure_core_test/src/tracing.rs +++ b/sdk/core/azure_core_test/src/tracing.rs @@ -334,6 +334,12 @@ fn check_span_information( panic!("Expected attribute not found: {} = {:?}", key, value); } } + // Finally, ensure the span has been closed (`end()` was called). + assert!( + !*span.is_open.lock().unwrap(), + "Span {} was not ended", + span.name + ); } /// Information about an instrumented API call. diff --git a/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs b/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs index b65d97818e..b5d489e11b 100644 --- a/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs @@ -6,7 +6,7 @@ mod signature_target; pub use authorization_policy::AuthorizationPolicy; use azure_core::http::{ - pager::PagerState, + pager::{PagerOptions, PagerState}, request::{options::ContentType, Request}, response::Response, ClientOptions, Context, Method, RawResponse, RetryOptions, @@ -112,24 +112,26 @@ impl CosmosPipeline { // First we clone the pipeline to pass it in to the closure let pipeline = self.pipeline.clone(); let ctx = ctx.with_value(resource_link).into_owned(); - Ok(FeedPager::from_callback(move |continuation| { - // Then we have to clone it again to pass it in to the async block. - // This is because Pageable can't borrow any data, it has to own it all. - // That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning. - let pipeline = pipeline.clone(); - let mut req = base_request.clone(); - let ctx = ctx.clone(); - async move { - if let PagerState::More(continuation) = continuation { - req.insert_header(constants::CONTINUATION, continuation); + Ok(FeedPager::from_callback( + move |continuation, ctx| { + // Then we have to clone it again to pass it in to the async block. + // This is because Pageable can't borrow any data, it has to own it all. + // That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning. + let pipeline = pipeline.clone(); + let mut req = base_request.clone(); + async move { + if let PagerState::More(continuation) = continuation { + req.insert_header(constants::CONTINUATION, continuation); + } + + let resp = pipeline.send(&ctx, &mut req, None).await?; + let page = FeedPage::::from_response(resp).await?; + + Ok(page.into()) } - - let resp = pipeline.send(&ctx, &mut req, None).await?; - let page = FeedPage::::from_response(resp).await?; - - Ok(page.into()) - } - })) + }, + Some(PagerOptions { context: ctx }), + )) } /// Helper function to read a throughput offer given a resource ID. diff --git a/sdk/keyvault/assets.json b/sdk/keyvault/assets.json index 3acda46933..b4cb587191 100644 --- a/sdk/keyvault/assets.json +++ b/sdk/keyvault/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "rust", "TagPrefix": "rust/keyvault", - "Tag": "rust/keyvault_bd60fd3a68" + "Tag": "rust/keyvault_af21e0c2ed" } diff --git a/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md b/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md index fac9382a66..26a4f38445 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md +++ b/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md @@ -10,6 +10,7 @@ - Removed `CertificateClient::begin_create_certificate()`. - Removed `CertificateClient::resume_create_certificate()`. - Removed `wait()` function from `Poller`. +- Changed `PollerOptions::frequency` from `Option` to `Duration`. ### Bugs Fixed diff --git a/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs b/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs index 094e602915..cc561923b3 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs +++ b/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs @@ -9,15 +9,12 @@ use azure_core::{ error::ErrorKind, http::{ headers::{RETRY_AFTER, RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS}, - policies::create_public_api_span, poller::{ get_retry_after, Poller, PollerResult, PollerState, PollerStatus, StatusMonitor as _, }, - Body, Method, RawResponse, Request, RequestContent, Url, + Body, Context, Method, RawResponse, Request, RequestContent, Url, }, - json, - tracing::{self, SpanStatus}, - Result, + json, tracing, Result, }; impl CertificateClient { @@ -76,7 +73,7 @@ impl CertificateClient { /// ``` #[tracing::function("KeyVault.createCertificate")] pub fn create_certificate( - &self, + &'_ self, certificate_name: &str, parameters: RequestContent, options: Option>, @@ -95,14 +92,9 @@ impl CertificateClient { let certificate_name = certificate_name.to_owned(); let parameters: Body = parameters.into(); - let mut ctx = options.method_options.context; - let span = create_public_api_span(&ctx, None, None); - if let Some(ref s) = span { - ctx = ctx.with_value(s.clone()); - } - + // let ctx = options.method_options.context; Ok(Poller::from_callback( - move |next_link: PollerState| { + move |next_link: PollerState, ctx: Context, poller_options| { let (mut request, next_link) = match next_link { PollerState::More(next_link) => { // Make sure the `api-version` is set appropriately. @@ -138,15 +130,13 @@ impl CertificateClient { let pipeline = pipeline.clone(); let api_version = api_version.clone(); - let ctx = ctx.clone(); - let span = span.clone(); async move { let rsp = pipeline.send(&ctx, &mut request, None).await?; let (status, headers, body) = rsp.deconstruct(); let retry_after = get_retry_after( &headers, &[RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS, RETRY_AFTER], - &options.poller_options, + &poller_options, ); let res: CertificateOperation = json::from_json(&body)?; let rsp = RawResponse::from_bytes(status, headers, body).into(); @@ -189,24 +179,6 @@ impl CertificateClient { let rsp: RawResponse = pipeline.send(&ctx, &mut request, None).await?; let (status, headers, body) = rsp.deconstruct(); - if let Some(span) = span { - // 5xx status codes SHOULD set status to Error. - // The description should not be set because it can be inferred from "http.response.status_code". - if status.is_server_error() { - span.set_status(SpanStatus::Error { - description: "".to_string(), - }); - } - if status.is_client_error() || status.is_server_error() - { - span.set_attribute( - "error.type", - status.to_string().into(), - ); - } - - span.end(); - } Ok(RawResponse::from_bytes(status, headers, body).into()) }) }), @@ -216,7 +188,8 @@ impl CertificateClient { }) } }, - None, + Some(options.method_options), + Some(options.poller_options), )) } } diff --git a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs index 61363c960c..76ecccf760 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs +++ b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs @@ -31,7 +31,7 @@ use azure_core::{ error::CheckSuccessOptions, fmt::SafeDebug, http::{ - pager::{PagerResult, PagerState}, + pager::{PagerOptions, PagerResult, PagerState}, policies::{BearerTokenAuthorizationPolicy, Policy}, ClientOptions, Method, NoFormat, Pager, Pipeline, PipelineSendOptions, RawResponse, Request, RequestContent, Response, Url, @@ -689,51 +689,55 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListCertificatePropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListCertificatePropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// List the versions of a certificate. @@ -772,51 +776,55 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListCertificatePropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListCertificatePropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// Lists the deleted certificates in the specified vault currently available for recovery. @@ -851,51 +859,55 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListDeletedCertificatePropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListDeletedCertificatePropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// List certificate issuers for a specified key vault. @@ -924,51 +936,55 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListIssuerPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListIssuerPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// Merges a certificate or a certificate chain with a key pair existing on the server. diff --git a/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs b/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs index e877d618eb..9f37275d9f 100644 --- a/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs +++ b/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs @@ -25,7 +25,7 @@ use azure_core::{ error::CheckSuccessOptions, fmt::SafeDebug, http::{ - pager::{PagerResult, PagerState}, + pager::{PagerOptions, PagerResult, PagerState}, policies::{BearerTokenAuthorizationPolicy, Policy}, ClientOptions, Method, NoFormat, Pager, Pipeline, PipelineSendOptions, RawResponse, Request, RequestContent, Response, Url, @@ -686,51 +686,55 @@ impl KeyClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListDeletedKeyPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListDeletedKeyPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// List keys in the specified vault. @@ -760,51 +764,55 @@ impl KeyClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListKeyPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListKeyPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// Retrieves a list of individual key versions with the same key name. @@ -842,51 +850,55 @@ impl KeyClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListKeyPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListKeyPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// Permanently deletes the specified key. diff --git a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs index 639d742627..9f7a3313ce 100644 --- a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs +++ b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs @@ -19,7 +19,7 @@ use azure_core::{ error::CheckSuccessOptions, fmt::SafeDebug, http::{ - pager::{PagerResult, PagerState}, + pager::{PagerOptions, PagerResult, PagerState}, policies::{BearerTokenAuthorizationPolicy, Policy}, ClientOptions, Method, NoFormat, Pager, Pipeline, PipelineSendOptions, RawResponse, Request, RequestContent, Response, Url, @@ -312,51 +312,55 @@ impl SecretClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListDeletedSecretPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListDeletedSecretPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// List secrets in a specified key vault. @@ -386,51 +390,55 @@ impl SecretClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListSecretPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListSecretPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// List all versions of the specified secret. @@ -469,51 +477,55 @@ impl SecretClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback(move |next_link: PagerState| { - let url = match next_link { - PagerState::More(next_link) => { - let qp = next_link - .query_pairs() - .filter(|(name, _)| name.ne("api-version")); - let mut next_link = next_link.clone(); - next_link - .query_pairs_mut() - .clear() - .extend_pairs(qp) - .append_pair("api-version", &api_version); - next_link + Ok(Pager::from_callback( + move |next_link: PagerState, ctx| { + let url = match next_link { + PagerState::More(next_link) => { + let qp = next_link + .query_pairs() + .filter(|(name, _)| name.ne("api-version")); + let mut next_link = next_link.clone(); + next_link + .query_pairs_mut() + .clear() + .extend_pairs(qp) + .append_pair("api-version", &api_version); + next_link + } + PagerState::Initial => first_url.clone(), + }; + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/json"); + let pipeline = pipeline.clone(); + async move { + let rsp = pipeline + .send( + &ctx, + &mut request, + Some(PipelineSendOptions { + check_success: CheckSuccessOptions { + success_codes: &[200], + }, + ..Default::default() + }), + ) + .await?; + let (status, headers, body) = rsp.deconstruct(); + let res: ListSecretPropertiesResult = json::from_json(&body)?; + let rsp = RawResponse::from_bytes(status, headers, body).into(); + Ok(match res.next_link { + Some(next_link) if !next_link.is_empty() => PagerResult::More { + response: rsp, + continuation: next_link.parse()?, + }, + _ => PagerResult::Done { response: rsp }, + }) } - PagerState::Initial => first_url.clone(), - }; - let mut request = Request::new(url, Method::Get); - request.insert_header("accept", "application/json"); - let ctx = options.method_options.context.clone(); - let pipeline = pipeline.clone(); - async move { - let rsp = pipeline - .send( - &ctx, - &mut request, - Some(PipelineSendOptions { - check_success: CheckSuccessOptions { - success_codes: &[200], - }, - ..Default::default() - }), - ) - .await?; - let (status, headers, body) = rsp.deconstruct(); - let res: ListSecretPropertiesResult = json::from_json(&body)?; - let rsp = RawResponse::from_bytes(status, headers, body).into(); - Ok(match res.next_link { - Some(next_link) if !next_link.is_empty() => PagerResult::More { - response: rsp, - continuation: next_link.parse()?, - }, - _ => PagerResult::Done { response: rsp }, - }) - } - })) + }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), + )) } /// Permanently deletes the specified secret. diff --git a/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs b/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs index 7226234fe5..d25842aeca 100644 --- a/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs +++ b/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs @@ -290,3 +290,218 @@ async fn round_trip_secret_verify_telemetry(ctx: TestContext) -> Result<()> { Ok(()) } + +#[recorded::test] +async fn list_secrets_verify_telemetry(ctx: TestContext) -> Result<()> { + use azure_core_test::tracing::ExpectedRestApiSpan; + + const SECRET_COUNT: usize = 50; + + let recording = ctx.recording(); + + { + let secret_client = { + let mut options = SecretClientOptions::default(); + recording.instrument(&mut options.client_options); + SecretClient::new( + recording.var("AZURE_KEYVAULT_URL", None).as_str(), + recording.credential(), + Some(options), + ) + }?; + for i in 1..=SECRET_COUNT { + let secret = secret_client + .set_secret( + &format!("secret-list-telemetry-{}", i), + SetSecretParameters { + value: Some(format!("secret-list-telemetry-value-{}", i)), + ..Default::default() + } + .try_into()?, + None, + ) + .await? + .into_body()?; + assert_eq!( + secret.value, + Some(format!("secret-list-telemetry-value-{}", i)) + ); + } + } + // Verify that the distributed tracing traces generated from the API call below match the expected traces. + let validate_result = azure_core_test::tracing::assert_instrumentation_information( + |tracer_provider| { + let mut options = SecretClientOptions::default(); + recording.instrument(&mut options.client_options); + options.client_options.instrumentation = InstrumentationOptions { + tracer_provider: Some(tracer_provider), + }; + SecretClient::new( + recording.var("AZURE_KEYVAULT_URL", None).as_str(), + recording.credential(), + Some(options), + ) + }, + |client: SecretClient| { + Box::pin(async move { + let mut secrets = client.list_secret_properties(None)?; + while let Some(secret) = secrets.try_next().await? { + let _ = secret.resource_id()?; + } + + Ok(()) + }) + }, + ExpectedInstrumentation { + package_name: recording.var("CARGO_PKG_NAME", None), + package_version: recording.var("CARGO_PKG_VERSION", None), + package_namespace: Some("KeyVault"), + api_calls: vec![ExpectedApiInformation { + api_name: Some("KeyVault.getSecrets"), + api_children: vec![ + ExpectedRestApiSpan { + api_verb: azure_core::http::Method::Get, + ..Default::default() + }, + ExpectedRestApiSpan { + api_verb: azure_core::http::Method::Get, + ..Default::default() + }, + ExpectedRestApiSpan { + api_verb: azure_core::http::Method::Get, + ..Default::default() + }, + ExpectedRestApiSpan { + api_verb: azure_core::http::Method::Get, + ..Default::default() + }, + ExpectedRestApiSpan { + api_verb: azure_core::http::Method::Get, + ..Default::default() + }, + ], + ..Default::default() + }], + }, + ) + .await; + + validate_result +} + +// Commented out for now until ItemIterator->PageIterator works for continuation tokens. + +// #[recorded::test] +// async fn list_secrets_verify_telemetry_rehydrated(ctx: TestContext) -> Result<()> { +// use azure_core_test::tracing::ExpectedRestApiSpan; + +// const SECRET_COUNT: usize = 50; + +// let recording = ctx.recording(); + +// { +// let secret_client = { +// let mut options = SecretClientOptions::default(); +// recording.instrument(&mut options.client_options); +// SecretClient::new( +// recording.var("AZURE_KEYVAULT_URL", None).as_str(), +// recording.credential(), +// Some(options), +// ) +// }?; +// for i in 1..=SECRET_COUNT { +// let secret = secret_client +// .set_secret( +// &format!("secret-rehydrate-telemetry-{}", i), +// SetSecretParameters { +// value: Some(format!("secret-rehydrate-telemetry-value-{}", i)), +// ..Default::default() +// } +// .try_into()?, +// None, +// ) +// .await? +// .into_body()?; +// assert_eq!( +// secret.value, +// Some(format!("secret-rehydrate-telemetry-value-{}", i)) +// ); +// } +// } +// // Verify that the distributed tracing traces generated from the API call below match the expected traces. +// let validate_result = azure_core_test::tracing::assert_instrumentation_information( +// |tracer_provider| { +// let mut options = SecretClientOptions::default(); +// recording.instrument(&mut options.client_options); +// options.client_options.instrumentation = InstrumentationOptions { +// tracer_provider: Some(tracer_provider), +// }; +// SecretClient::new( +// recording.var("AZURE_KEYVAULT_URL", None).as_str(), +// recording.credential(), +// Some(options), +// ) +// }, +// |client: SecretClient| { +// Box::pin(async move { +// let mut first_pager = client.list_secret_properties(None)?.into_pages(); + +// // Prime the iteration. +// let first_page = first_pager +// .try_next() +// .await? +// .expect("expected at least one page"); +// { +// let secrets = first_page.into_body()?; +// for secret in secrets.value { +// let _ = secret.resource_id()?; +// } +// } + +// let rehydration_token = first_pager +// .continuation_token() +// .expect("expected continuation token to be created after first page"); + +// let mut rehydrated_pager = client +// .list_secret_properties(None)? +// .into_pages() +// .with_continuation_token(rehydration_token); + +// while let Some(secret_page) = rehydrated_pager.try_next().await? { +// let secrets = secret_page.into_body()?; +// for secret in secrets.value { +// let _ = secret.resource_id()?; +// } +// } + +// Ok(()) +// }) +// }, +// ExpectedInstrumentation { +// package_name: recording.var("CARGO_PKG_NAME", None), +// package_version: recording.var("CARGO_PKG_VERSION", None), +// package_namespace: Some("KeyVault"), +// api_calls: vec![ExpectedApiInformation { +// api_name: Some("KeyVault.getSecrets"), +// api_children: vec![ +// ExpectedRestApiSpan { +// api_verb: azure_core::http::Method::Get, +// ..Default::default() +// }, +// ExpectedRestApiSpan { +// api_verb: azure_core::http::Method::Get, +// ..Default::default() +// }, +// ExpectedRestApiSpan { +// api_verb: azure_core::http::Method::Get, +// ..Default::default() +// }, +// ], +// ..Default::default() +// }], +// }, +// ) +// .await; + +// validate_result +// } diff --git a/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs b/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs index 455159d393..39d976729d 100644 --- a/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs +++ b/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs @@ -26,7 +26,7 @@ use azure_core::{ error::CheckSuccessOptions, fmt::SafeDebug, http::{ - pager::{PagerResult, PagerState}, + pager::{PagerOptions, PagerResult, PagerState}, policies::{BearerTokenAuthorizationPolicy, Policy}, ClientOptions, Method, NoFormat, PageIterator, Pipeline, PipelineSendOptions, RawResponse, Request, RequestContent, Response, Url, XmlFormat, @@ -837,7 +837,7 @@ impl BlobContainerClient { } let version = self.version.clone(); Ok(PageIterator::from_callback( - move |marker: PagerState| { + move |marker: PagerState, ctx| { let mut url = first_url.clone(); if let PagerState::More(marker) = marker { if url.query_pairs().any(|(name, _)| name.eq("marker")) { @@ -857,7 +857,6 @@ impl BlobContainerClient { request.insert_header("x-ms-client-request-id", client_request_id); } request.insert_header("x-ms-version", &version); - let ctx = options.method_options.context.clone(); let pipeline = pipeline.clone(); async move { let rsp = pipeline @@ -884,6 +883,9 @@ impl BlobContainerClient { }) } }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), )) } @@ -964,7 +966,7 @@ impl BlobContainerClient { } let version = self.version.clone(); Ok(PageIterator::from_callback( - move |marker: PagerState| { + move |marker: PagerState, ctx| { let mut url = first_url.clone(); if let PagerState::More(marker) = marker { if url.query_pairs().any(|(name, _)| name.eq("marker")) { @@ -984,7 +986,6 @@ impl BlobContainerClient { request.insert_header("x-ms-client-request-id", client_request_id); } request.insert_header("x-ms-version", &version); - let ctx = options.method_options.context.clone(); let pipeline = pipeline.clone(); async move { let rsp = pipeline @@ -1011,6 +1012,9 @@ impl BlobContainerClient { }) } }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), )) } diff --git a/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs b/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs index 27278e50ba..850c80cbfa 100644 --- a/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs +++ b/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs @@ -16,7 +16,7 @@ use azure_core::{ error::CheckSuccessOptions, fmt::SafeDebug, http::{ - pager::{PagerResult, PagerState}, + pager::{PagerOptions, PagerResult, PagerState}, policies::{BearerTokenAuthorizationPolicy, Policy}, ClientOptions, Method, NoFormat, PageIterator, Pipeline, PipelineSendOptions, RawResponse, Request, RequestContent, Response, Url, XmlFormat, @@ -445,7 +445,7 @@ impl BlobServiceClient { } let version = self.version.clone(); Ok(PageIterator::from_callback( - move |marker: PagerState| { + move |marker: PagerState, ctx| { let mut url = first_url.clone(); if let PagerState::More(marker) = marker { if url.query_pairs().any(|(name, _)| name.eq("marker")) { @@ -465,7 +465,6 @@ impl BlobServiceClient { request.insert_header("x-ms-client-request-id", client_request_id); } request.insert_header("x-ms-version", &version); - let ctx = options.method_options.context.clone(); let pipeline = pipeline.clone(); async move { let rsp = pipeline @@ -492,6 +491,9 @@ impl BlobServiceClient { }) } }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), )) } diff --git a/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs b/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs index f4d442dba6..46a522c025 100644 --- a/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs +++ b/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs @@ -16,7 +16,7 @@ use azure_core::{ error::CheckSuccessOptions, fmt::SafeDebug, http::{ - pager::{PagerResult, PagerState}, + pager::{PagerOptions, PagerResult, PagerState}, policies::{BearerTokenAuthorizationPolicy, Policy}, ClientOptions, Method, NoFormat, PageIterator, Pipeline, PipelineSendOptions, RawResponse, Request, RequestContent, Response, Url, XmlFormat, @@ -255,7 +255,7 @@ impl QueueServiceClient { } let version = self.version.clone(); Ok(PageIterator::from_callback( - move |marker: PagerState| { + move |marker: PagerState, ctx| { let mut url = first_url.clone(); if let PagerState::More(marker) = marker { if url.query_pairs().any(|(name, _)| name.eq("marker")) { @@ -275,7 +275,6 @@ impl QueueServiceClient { request.insert_header("x-ms-client-request-id", client_request_id); } request.insert_header("x-ms-version", &version); - let ctx = options.method_options.context.clone(); let pipeline = pipeline.clone(); async move { let rsp = pipeline @@ -302,6 +301,9 @@ impl QueueServiceClient { }) } }, + Some(PagerOptions { + context: options.method_options.context.clone(), + }), )) }