diff --git a/.gitignore b/.gitignore
index 59019604e1..d6e75923ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,6 +20,7 @@ vcpkg_installed/
# Editor user customizations.
.vscode/launch.json
+.vscode/mcp.json
.idea/
# Editor temp files.
diff --git a/sdk/core/azure_core/src/http/pager.rs b/sdk/core/azure_core/src/http/pager.rs
index 7b85a6244c..1a62dd1073 100644
--- a/sdk/core/azure_core/src/http/pager.rs
+++ b/sdk/core/azure_core/src/http/pager.rs
@@ -3,7 +3,11 @@
//! Types and methods for pageable responses.
-use crate::http::{headers::HeaderName, response::Response, DeserializeWith, Format, JsonFormat};
+use crate::http::{
+ headers::HeaderName, policies::create_public_api_span, response::Response, Context,
+ DeserializeWith, Format, JsonFormat,
+};
+use crate::tracing::Span;
use async_trait::async_trait;
use futures::{stream::unfold, FutureExt, Stream};
use std::{
@@ -205,6 +209,13 @@ type BoxedStream
= Box> + Send>;
#[cfg(target_arch = "wasm32")]
type BoxedStream = Box>>;
+/// Options for configuring a [`Pager`]'s behavior.
+#[derive(Clone, Debug, Default)]
+pub struct PagerOptions<'a> {
+ /// Context for HTTP requests made by the pager.
+ pub context: Context<'a>,
+}
+
/// Iterates over a collection of items or individual pages of items from a service.
///
/// You can asynchronously iterate over items returned by a collection request to a service,
@@ -302,7 +313,7 @@ impl ItemIterator {
/// }
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
/// let mut base_req = Request::new(url, Method::Get);
- /// let pager = ItemIterator::from_callback(move |next_link: PagerState| {
+ /// let pager = ItemIterator::from_callback(move |next_link: PagerState, ctx: Context| {
/// // The callback must be 'static, so you have to clone and move any values you want to use.
/// let pipeline = pipeline.clone();
/// let api_version = api_version.clone();
@@ -321,7 +332,7 @@ impl ItemIterator {
/// .append_pair("api-version", &api_version);
/// }
/// let resp = pipeline
- /// .send(&Context::new(), &mut req, None)
+ /// .send(&ctx, &mut req, None)
/// .await?;
/// let (status, headers, body) = resp.deconstruct();
/// let result: ListItemsResult = json::from_json(&body)?;
@@ -334,7 +345,7 @@ impl ItemIterator {
/// None => PagerResult::Done { response: resp }
/// })
/// }
- /// });
+ /// }, None);
/// ```
///
/// To page results using headers:
@@ -357,7 +368,7 @@ impl ItemIterator {
/// }
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
/// let mut base_req = Request::new(url, Method::Get);
- /// let pager = ItemIterator::from_callback(move |continuation| {
+ /// let pager = ItemIterator::from_callback(move |continuation, ctx| {
/// // The callback must be 'static, so you have to clone and move any values you want to use.
/// let pipeline = pipeline.clone();
/// let mut req = base_req.clone();
@@ -366,25 +377,36 @@ impl ItemIterator {
/// req.insert_header("x-ms-continuation", continuation);
/// }
/// let resp: Response = pipeline
- /// .send(&Context::new(), &mut req, None)
+ /// .send(&ctx, &mut req, None)
/// .await?
/// .into();
/// Ok(PagerResult::from_response_header(resp, &HeaderName::from_static("x-next-continuation")))
/// }
- /// });
+ /// },None);
/// ```
pub fn from_callback<
+ 'a,
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] C: AsRef + Send + 'static,
- #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState) -> Fut + Send + 'static,
+ #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, Context<'a>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef + 'static,
- #[cfg(target_arch = "wasm32")] F: Fn(PagerState) -> Fut + 'static,
+ #[cfg(target_arch = "wasm32")] F: Fn(PagerState, Context<'a>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future>> + 'static,
>(
make_request: F,
- ) -> Self {
- Self::from_stream(iter_from_callback(make_request, || None, |_| {}))
+ options: Option>,
+ ) -> Self
+ where
+ 'a: 'static,
+ {
+ let options = options.unwrap_or_default();
+ Self::from_stream(iter_from_callback(
+ make_request,
+ options.context.clone(),
+ || None,
+ |_| {},
+ ))
}
/// Creates a [`ItemIterator`] from a raw stream of [`Result
`](crate::Result
) values.
@@ -523,7 +545,7 @@ impl
PageIterator
{
/// }
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
/// let mut base_req = Request::new(url, Method::Get);
- /// let pager = PageIterator::from_callback(move |next_link: PagerState| {
+ /// let pager = PageIterator::from_callback(move |next_link: PagerState, ctx| {
/// // The callback must be 'static, so you have to clone and move any values you want to use.
/// let pipeline = pipeline.clone();
/// let api_version = api_version.clone();
@@ -542,7 +564,7 @@ impl PageIterator
{
/// .append_pair("api-version", &api_version);
/// }
/// let resp = pipeline
- /// .send(&Context::new(), &mut req, None)
+ /// .send(&ctx, &mut req, None)
/// .await?;
/// let (status, headers, body) = resp.deconstruct();
/// let result: ListItemsResult = json::from_json(&body)?;
@@ -555,7 +577,7 @@ impl
PageIterator
{
/// None => PagerResult::Done { response: resp }
/// })
/// }
- /// });
+ /// }, None);
/// ```
///
/// To page results using headers:
@@ -569,7 +591,7 @@ impl
PageIterator
{
/// }
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
/// let mut base_req = Request::new(url, Method::Get);
- /// let pager = PageIterator::from_callback(move |continuation| {
+ /// let pager = PageIterator::from_callback(move |continuation, ctx| {
/// // The callback must be 'static, so you have to clone and move any values you want to use.
/// let pipeline = pipeline.clone();
/// let mut req = base_req.clone();
@@ -578,33 +600,38 @@ impl
PageIterator
{
/// req.insert_header("x-ms-continuation", continuation);
/// }
/// let resp: Response = pipeline
- /// .send(&Context::new(), &mut req, None)
+ /// .send(&ctx, &mut req, None)
/// .await?
/// .into();
/// Ok(PagerResult::from_response_header(resp, &HeaderName::from_static("x-ms-continuation")))
/// }
- /// });
+ /// }, None);
/// ```
pub fn from_callback<
+ 'a,
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] C: AsRef + FromStr + Send + 'static,
- #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState) -> Fut + Send + 'static,
+ #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, Context<'a>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef + FromStr + 'static,
- #[cfg(target_arch = "wasm32")] F: Fn(PagerState) -> Fut + 'static,
+ #[cfg(target_arch = "wasm32")] F: Fn(PagerState, Context<'a>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future>> + 'static,
>(
make_request: F,
+ options: Option>,
) -> Self
where
::Err: fmt::Debug,
+ 'a: 'static,
{
+ let options = options.unwrap_or_default();
let continuation_token = Arc::new(Mutex::new(None::));
let get_clone = continuation_token.clone();
let set_clone = continuation_token.clone();
let stream = iter_from_callback(
make_request,
+ options.context.clone(),
move || {
if let Ok(token_guard) = get_clone.lock() {
return token_guard
@@ -723,56 +750,118 @@ enum State {
Done,
}
+struct PagerStreamState<'a, C, F, G, S> {
+ state: State,
+ make_request: F,
+ get_next: G,
+ set_next: S,
+ ctx: Context<'a>,
+ added_span: bool,
+}
+
fn iter_from_callback<
+ 'a,
P,
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
#[cfg(not(target_arch = "wasm32"))] C: AsRef + Send + 'static,
- #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState) -> Fut + Send + 'static,
+ #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, Context<'a>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] G: Fn() -> Option + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] S: Fn(Option<&str>) + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef + 'static,
- #[cfg(target_arch = "wasm32")] F: Fn(PagerState) -> Fut + 'static,
+ #[cfg(target_arch = "wasm32")] F: Fn(PagerState, Context<'a>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future>> + 'static,
#[cfg(target_arch = "wasm32")] G: Fn() -> Option + 'static,
#[cfg(target_arch = "wasm32")] S: Fn(Option<&str>) + 'static,
>(
make_request: F,
+ ctx: Context<'a>,
get_next: G,
set_next: S,
-) -> impl Stream- > + 'static {
+) -> impl Stream
- > + 'static
+where
+ 'a: 'static,
+{
+ // Keep `ctx` alive within the stream's state so callers can safely capture it
+ // (e.g., for tracing) without forcing non-'static borrows that trigger lifetime errors.
unfold(
- // We flow the `make_request` callback, 'get_next', and `set_next` through the state value so that we can avoid cloning.
- (State::Init, make_request, get_next, set_next),
- |(mut state, make_request, get_next, set_next)| async move {
- if let Some(next_token) = get_next() {
- state = State::More(next_token);
+ // Flow `make_request`, `get_next`, `set_next`, and `ctx` through the state to avoid cloning.
+ PagerStreamState::
{
+ state: State::Init,
+ make_request,
+ get_next,
+ set_next,
+ ctx,
+ added_span: false,
+ },
+ |mut stream_state| async move {
+ if let Some(next_token) = (stream_state.get_next)() {
+ stream_state.state = State::More(next_token);
}
- let result = match state {
- State::Init => make_request(PagerState::Initial).await,
- State::More(n) => make_request(PagerState::More(n)).await,
+ let result = match stream_state.state {
+ State::Init => {
+ // At the very start of polling, create a span for the entire request, and attach it to the context
+ let span = create_public_api_span(&stream_state.ctx, None, None);
+ if let Some(ref s) = span {
+ stream_state.added_span = true;
+ stream_state.ctx = stream_state.ctx.with_value(s.clone());
+ }
+ (stream_state.make_request)(PagerState::Initial, stream_state.ctx.clone()).await
+ }
+ State::More(n) => {
+ (stream_state.make_request)(PagerState::More(n), stream_state.ctx.clone()).await
+ }
State::Done => {
- set_next(None);
+ (stream_state.set_next)(None);
return None;
}
};
let (item, next_state) = match result {
- Err(e) => return Some((Err(e), (State::Done, make_request, get_next, set_next))),
+ Err(e) => {
+ stream_state.state = State::Done;
+ return Some((Err(e), stream_state));
+ }
Ok(PagerResult::More {
response,
continuation: next_token,
}) => {
- set_next(Some(next_token.as_ref()));
+ (stream_state.set_next)(Some(next_token.as_ref()));
(Ok(response), State::More(next_token))
}
Ok(PagerResult::Done { response }) => {
- set_next(None);
+ // When the result is done, finalize the span. Note that we only do that if we created the span in the first place,
+ // otherwise it is the responsibility of the caller to end their span.
+ if stream_state.added_span {
+ if let Some(span) = stream_state.ctx.value::>() {
+ // P is unconstrained, so it's not possible to retrieve the status code for now.
+
+ // // 5xx status codes SHOULD set status to Error.
+ // // The description should not be set because it can be inferred from "http.response.status_code".
+ // if response.status().is_server_error() {
+ // span.set_status(SpanStatus::Error {
+ // description: "".to_string(),
+ // });
+ // }
+ // if response.status().is_client_error()
+ // || response.status().is_server_error()
+ // {
+ // span.set_attribute(
+ // "error.type",
+ // response.status().to_string().into(),
+ // );
+ // }
+
+ span.end();
+ }
+ }
+ (stream_state.set_next)(None);
(Ok(response), State::Done)
}
};
- // Flow 'make_request', 'get_next', and 'set_next' through to avoid cloning
- Some((item, (next_state, make_request, get_next, set_next)))
+ // Propagate state (including `ctx`) without cloning.
+ stream_state.state = next_state;
+ Some((item, stream_state))
},
)
}
@@ -781,8 +870,8 @@ fn iter_from_callback<
mod tests {
use crate::http::{
headers::{HeaderName, HeaderValue},
- pager::{PageIterator, Pager, PagerResult, PagerState},
- RawResponse, Response, StatusCode,
+ pager::{PageIterator, Pager, PagerOptions, PagerResult, PagerState},
+ Context, RawResponse, Response, StatusCode,
};
use async_trait::async_trait;
use futures::{StreamExt as _, TryStreamExt as _};
@@ -808,81 +897,89 @@ mod tests {
#[tokio::test]
async fn callback_item_pagination() {
- let pager: Pager = Pager::from_callback(|continuation| async move {
- match continuation {
- PagerState::Initial => Ok(PagerResult::More {
- response: RawResponse::from_bytes(
- StatusCode::Ok,
- HashMap::from([(
- HeaderName::from_static("x-test-header"),
- HeaderValue::from_static("page-1"),
- )])
+ let context = Context::new();
+ let pager: Pager = Pager::from_callback(
+ |continuation, _| async move {
+ match continuation {
+ PagerState::Initial => Ok(PagerResult::More {
+ response: RawResponse::from_bytes(
+ StatusCode::Ok,
+ HashMap::from([(
+ HeaderName::from_static("x-test-header"),
+ HeaderValue::from_static("page-1"),
+ )])
+ .into(),
+ r#"{"items":[1],"page":1}"#,
+ )
.into(),
- r#"{"items":[1],"page":1}"#,
- )
- .into(),
- continuation: "1",
- }),
- PagerState::More("1") => Ok(PagerResult::More {
- response: RawResponse::from_bytes(
- StatusCode::Ok,
- HashMap::from([(
- HeaderName::from_static("x-test-header"),
- HeaderValue::from_static("page-2"),
- )])
+ continuation: "1",
+ }),
+ PagerState::More("1") => Ok(PagerResult::More {
+ response: RawResponse::from_bytes(
+ StatusCode::Ok,
+ HashMap::from([(
+ HeaderName::from_static("x-test-header"),
+ HeaderValue::from_static("page-2"),
+ )])
+ .into(),
+ r#"{"items":[2],"page":2}"#,
+ )
.into(),
- r#"{"items":[2],"page":2}"#,
- )
- .into(),
- continuation: "2",
- }),
- PagerState::More("2") => Ok(PagerResult::Done {
- response: RawResponse::from_bytes(
- StatusCode::Ok,
- HashMap::from([(
- HeaderName::from_static("x-test-header"),
- HeaderValue::from_static("page-3"),
- )])
+ continuation: "2",
+ }),
+ PagerState::More("2") => Ok(PagerResult::Done {
+ response: RawResponse::from_bytes(
+ StatusCode::Ok,
+ HashMap::from([(
+ HeaderName::from_static("x-test-header"),
+ HeaderValue::from_static("page-3"),
+ )])
+ .into(),
+ r#"{"items":[3],"page":3}"#,
+ )
.into(),
- r#"{"items":[3],"page":3}"#,
- )
- .into(),
- }),
- _ => {
- panic!("Unexpected continuation value")
+ }),
+ _ => {
+ panic!("Unexpected continuation value")
+ }
}
- }
- });
+ },
+ Some(PagerOptions { context }),
+ );
let items: Vec = pager.try_collect().await.unwrap();
assert_eq!(vec![1, 2, 3], items.as_slice())
}
#[tokio::test]
async fn callback_item_pagination_error() {
- let pager: Pager = Pager::from_callback(|continuation| async move {
- match continuation {
- PagerState::Initial => Ok(PagerResult::More {
- response: RawResponse::from_bytes(
- StatusCode::Ok,
- HashMap::from([(
- HeaderName::from_static("x-test-header"),
- HeaderValue::from_static("page-1"),
- )])
+ let context = Context::new();
+ let pager: Pager = Pager::from_callback(
+ |continuation, _| async move {
+ match continuation {
+ PagerState::Initial => Ok(PagerResult::More {
+ response: RawResponse::from_bytes(
+ StatusCode::Ok,
+ HashMap::from([(
+ HeaderName::from_static("x-test-header"),
+ HeaderValue::from_static("page-1"),
+ )])
+ .into(),
+ r#"{"items":[1],"page":1}"#,
+ )
.into(),
- r#"{"items":[1],"page":1}"#,
- )
- .into(),
- continuation: "1",
- }),
- PagerState::More("1") => Err(typespec::Error::with_message(
- typespec::error::ErrorKind::Other,
- "yon request didst fail",
- )),
- _ => {
- panic!("Unexpected continuation value")
+ continuation: "1",
+ }),
+ PagerState::More("1") => Err(typespec::Error::with_message(
+ typespec::error::ErrorKind::Other,
+ "yon request didst fail",
+ )),
+ _ => {
+ panic!("Unexpected continuation value")
+ }
}
- }
- });
+ },
+ Some(PagerOptions { context }),
+ );
let pages: Vec> = pager
.into_pages()
.then(|r| async move {
@@ -916,7 +1013,7 @@ mod tests {
#[tokio::test]
async fn page_iterator_with_continuation_token() {
let make_callback = || {
- |continuation: PagerState| async move {
+ |continuation: PagerState, _| async move {
match continuation.as_deref() {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
@@ -959,9 +1056,15 @@ mod tests {
}
};
+ let ctx = Context::new();
+
// Create the first PageIterator.
- let mut first_pager: PageIterator> =
- PageIterator::from_callback(make_callback());
+ let mut first_pager: PageIterator> = PageIterator::from_callback(
+ make_callback(),
+ Some(PagerOptions {
+ context: ctx.clone(),
+ }),
+ );
// Should start with no continuation_token.
assert_eq!(first_pager.continuation_token(), None);
@@ -984,8 +1087,9 @@ mod tests {
assert_eq!(continuation_token, "next-token-1");
// Create the second PageIterator.
+ let context = Context::new();
let mut second_pager: PageIterator> =
- PageIterator::from_callback(make_callback())
+ PageIterator::from_callback(make_callback(), Some(PagerOptions { context }))
.with_continuation_token(continuation_token);
// Should start with link to second page.
diff --git a/sdk/core/azure_core/src/http/poller.rs b/sdk/core/azure_core/src/http/poller.rs
index 4e733c2a39..2e38cc8e65 100644
--- a/sdk/core/azure_core/src/http/poller.rs
+++ b/sdk/core/azure_core/src/http/poller.rs
@@ -7,12 +7,14 @@ use crate::{
error::{ErrorKind, ErrorResponse},
http::{
headers::{HeaderName, Headers},
- Format, JsonFormat, Response, StatusCode,
+ policies::create_public_api_span,
+ Context, Format, JsonFormat, Response, StatusCode,
},
sleep,
time::{Duration, OffsetDateTime},
+ tracing::{Span, SpanStatus},
};
-use futures::{stream::unfold, Stream, StreamExt};
+use futures::{channel::oneshot, stream::unfold, Stream, StreamExt};
use serde::Deserialize;
use std::{
convert::Infallible,
@@ -20,14 +22,18 @@ use std::{
future::{Future, IntoFuture},
pin::Pin,
str::FromStr,
- task::{Context, Poll},
+ sync::Arc,
+ task::{Context as TaskContext, Poll},
};
+use typespec_client_core::http::ClientMethodOptions;
/// Default retry time for long-running operations if no retry-after header is present
///
/// This value is the same as the default used in other Azure SDKs e.g.,
///
const DEFAULT_RETRY_TIME: Duration = Duration::seconds(30);
+
+/// Minimum retry time for long-running operations
const MIN_RETRY_TIME: Duration = Duration::seconds(1);
/// Represents the state of a [`Poller`].
@@ -150,13 +156,13 @@ pub struct PollerOptions {
/// The time to wait between polling intervals in absence of a `retry-after` header.
///
/// The default is 30 seconds. The minimum time enforced by [`Poller::from_callback`] is 1 second.
- pub frequency: Option,
+ pub frequency: Duration,
}
impl Default for PollerOptions {
fn default() -> Self {
Self {
- frequency: Some(DEFAULT_RETRY_TIME),
+ frequency: DEFAULT_RETRY_TIME,
}
}
}
@@ -174,7 +180,7 @@ pub enum PollerResult {
/// The HTTP response with the status monitor.
response: Response,
/// The optional client-specified [`Duration`] to wait before polling again.
- retry_after: Option,
+ retry_after: Duration,
/// The next link / continuation token.
next: N,
},
@@ -342,7 +348,7 @@ where
target: Option>,
}
-impl Poller
+impl<'a, M, F> Poller
where
M: StatusMonitor,
F: Format + Send,
@@ -389,7 +395,7 @@ where
/// let url = "https://example.com/my_operation".parse().unwrap();
/// let mut req = Request::new(url, Method::Post);
///
- /// let poller = Poller::from_callback(move |operation_url: PollerState| {
+ /// let poller = Poller::from_callback(move |operation_url: PollerState, ctx, options| {
/// // The callback must be 'static, so you have to clone and move any values you want to use.
/// let pipeline = pipeline.clone();
/// let api_version = api_version.clone();
@@ -406,7 +412,7 @@ where
/// .append_pair("api-version", &api_version);
///
/// let resp = pipeline
- /// .send(&Context::new(), &mut req, None)
+ /// .send(&ctx, &mut req, None)
/// .await?;
/// let (status, headers, body) = resp.deconstruct();
/// let result: OperationResult = json::from_json(&body)?;
@@ -419,7 +425,7 @@ where
/// let operation_url = format!("https://example.com/operations/{}", result.id).parse()?;
/// Ok(PollerResult::InProgress {
/// response: resp,
- /// retry_after: None,
+ /// retry_after: options.frequency,
/// next: operation_url
/// })
/// }
@@ -441,25 +447,29 @@ where
/// _ => Ok(PollerResult::Done { response: resp })
/// }
/// }
- /// }, None);
+ /// }, None, None);
/// ```
pub fn from_callback<
#[cfg(not(target_arch = "wasm32"))] N: Send + 'static,
- #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState) -> Fut + Send + 'static,
+ #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static,
#[cfg(target_arch = "wasm32")] N: 'static,
- #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState) -> Fut + 'static,
+ #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future>> + 'static,
>(
make_request: Fun,
+ method_options: Option>,
options: Option,
) -> Self
where
+ 'a: 'static,
M: Send + 'static,
M::Output: Send + 'static,
M::Format: Send + 'static,
{
- let (stream, target) = create_poller_stream(make_request, options);
+ let method_options = method_options.unwrap_or_default();
+ let options = options.unwrap_or_default();
+ let (stream, target) = create_poller_stream(make_request, method_options, options);
Self {
stream: Box::pin(stream),
target: Some(target),
@@ -493,7 +503,7 @@ where
{
type Item = crate::Result>;
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ fn poll_next(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll > {
let state = self.project().stream.poll_next(cx);
if let Poll::Ready(Some(Ok(ref response))) = state {
check_status_code(response)?;
@@ -579,50 +589,102 @@ enum State {
Done,
}
+/// The type of the oneshot channel sender for the target future.
+type TargetTransmitterType<'a, M> = (Pin>, Option>);
+
+/// Represents the state used for each iteration through the poller stream.
+struct PollerStreamState<'a, M, N, Fun>
+where
+ M: StatusMonitor,
+{
+ /// The current polling state (Init, InProgress, or Done)
+ state: State,
+ /// The callback function to make requests
+ make_request: Fun,
+ /// Optional channel sender for the target future
+ target_tx: Option>>,
+ /// The context for the operation
+ ctx: Context<'a>,
+ /// The poller options
+ options: PollerOptions,
+ /// Whether a span was added to the context
+ added_span: bool,
+}
+
fn create_poller_stream<
+ 'a,
M,
F: Format,
#[cfg(not(target_arch = "wasm32"))] N: Send + 'static,
- #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState) -> Fut + Send + 'static,
+ #[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static,
#[cfg(target_arch = "wasm32")] N: 'static,
- #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState) -> Fut + 'static,
+ #[cfg(target_arch = "wasm32")] Fun: Fn(PollerState, Context<'a>, PollerOptions) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future>> + 'static,
>(
make_request: Fun,
- options: Option,
+ method_options: ClientMethodOptions<'a>,
+ options: PollerOptions,
) -> (
impl Stream- >> + 'static,
BoxedFuture
,
)
where
+ 'a: 'static,
M: StatusMonitor + 'static,
M::Output: Send + 'static,
M::Format: Send + 'static,
{
- use futures::channel::oneshot;
-
+ let ctx = method_options.context.clone();
let (target_tx, target_rx) = oneshot::channel();
- let frequency = options
- .unwrap_or_default()
- .frequency
- .unwrap_or(DEFAULT_RETRY_TIME);
+
assert!(
- frequency >= MIN_RETRY_TIME,
+ options.frequency >= MIN_RETRY_TIME,
"minimum polling frequency is 1 second"
);
-
let stream = unfold(
// We flow the `make_request` callback through the state value to avoid cloning.
- (State::Init, make_request, Some(target_tx)),
- move |(state, make_request, target_tx)| async move {
- let result = match state {
- State::Init => make_request(PollerState::Initial).await,
- State::InProgress(n) => make_request(PollerState::More(n)).await,
- State::Done => return None,
+ PollerStreamState:: {
+ state: State::Init,
+ make_request,
+ target_tx: Some(target_tx),
+ ctx,
+ options,
+ added_span: false,
+ },
+ move |mut poller_stream_state| async move {
+ let result = match poller_stream_state.state {
+ State::Init => {
+ // At the very start of polling, create a span for the entire request, and attach it to the context
+ let span = create_public_api_span(&poller_stream_state.ctx, None, None);
+ if let Some(ref s) = span {
+ poller_stream_state.added_span = true;
+ poller_stream_state.ctx = poller_stream_state.ctx.with_value(s.clone());
+ }
+ (poller_stream_state.make_request)(
+ PollerState::Initial,
+ poller_stream_state.ctx.clone(),
+ poller_stream_state.options,
+ )
+ .await
+ }
+ State::InProgress(n) => {
+ (poller_stream_state.make_request)(
+ PollerState::More(n),
+ poller_stream_state.ctx.clone(),
+ poller_stream_state.options,
+ )
+ .await
+ }
+ State::Done => {
+ return None;
+ }
};
let (item, next_state) = match result {
- Err(e) => return Some((Err(e), (State::Done, make_request, target_tx))),
+ Err(e) => {
+ poller_stream_state.state = State::Done;
+ return Some((Err(e), poller_stream_state));
+ }
Ok(PollerResult::InProgress {
response,
retry_after,
@@ -630,35 +692,86 @@ where
}) => {
// Note that test-proxy automatically adds a transform that zeroes an existing `after-retry` header during playback, so don't check at runtime:
//
- let duration = retry_after.unwrap_or(frequency);
-
- tracing::trace!("retry poller in {}s", duration.whole_seconds());
- sleep(duration).await;
+ tracing::trace!("retry poller in {}s", retry_after.whole_seconds());
+ sleep(retry_after).await;
(Ok(response), State::InProgress(n))
}
+ // Note that we will normally never reach this state. The normal progression of the `make_request` callback is to return `Succeeded` with a target future,
+ // and then the stream yields the final response and transitions to `Done` state.
+ // The only time that the `make_request` callback will normally enter the `Done` state directly is if the LRO fails or is canceled.
Ok(PollerResult::Done { response }) => (Ok(response), State::Done),
Ok(PollerResult::Succeeded {
response,
target: get_target,
}) => {
// Send the target callback through the channel
- if let Some(tx) = target_tx {
- let _ = tx.send(get_target());
+ if let Some(tx) = poller_stream_state.target_tx.take() {
+ let _ = tx.send((
+ get_target(),
+ if poller_stream_state.added_span {
+ Some(poller_stream_state.ctx.clone())
+ } else {
+ None
+ },
+ ));
}
- // Also yield the final status response and set target_tx to None since we consumed it
- return Some((Ok(response), (State::Done, make_request, None)));
+ // Also yield the final status response
+ poller_stream_state.state = State::Done;
+ return Some((Ok(response), poller_stream_state));
}
};
- // Flow 'make_request' and target_tx through to avoid cloning
- Some((item, (next_state, make_request, target_tx)))
+ // Update state and return
+ poller_stream_state.state = next_state;
+ Some((item, poller_stream_state))
},
);
let target = Box::new(async move {
match target_rx.await {
- Ok(fut) => fut.await,
+ Ok(target_state) => {
+ // Await the target future to get the final response from the poller.
+ let res = target_state.0.await;
+ // If we added a span to the target, take the result of the final target future to finalize the span.
+ if let Some(ctx) = target_state.1 {
+ match &res {
+ Ok(response) => {
+ // When the result is done, finalize the span. Note that we only do that if we created the span in the first place,
+ // otherwise it is the responsibility of the caller to end their span.
+ if let Some(span) = ctx.value::>() {
+ // 5xx status codes SHOULD set status to Error.
+ // The description should not be set because it can be inferred from "http.response.status_code".
+ if response.status().is_server_error() {
+ span.set_status(SpanStatus::Error {
+ description: "".to_string(),
+ });
+ }
+ if response.status().is_client_error()
+ || response.status().is_server_error()
+ {
+ span.set_attribute(
+ "error.type",
+ response.status().to_string().into(),
+ );
+ }
+
+ span.end();
+ }
+ }
+ Err(err) => {
+ if let Some(span) = ctx.value::>() {
+ span.set_status(SpanStatus::Error {
+ description: err.to_string(),
+ });
+ span.set_attribute("error.type", err.kind().to_string().into());
+ span.end();
+ }
+ }
+ }
+ }
+ res
+ }
Err(err) => Err(crate::Error::with_error(
ErrorKind::Other,
err,
@@ -675,11 +788,11 @@ pub fn get_retry_after(
headers: &Headers,
retry_headers: &[HeaderName],
options: &PollerOptions,
-) -> Option {
+) -> Duration {
#[cfg_attr(feature = "test", allow(unused_mut))]
let duration =
crate::http::policies::get_retry_after(headers, OffsetDateTime::now_utc, retry_headers)
- .or(options.frequency);
+ .unwrap_or(options.frequency);
#[cfg(feature = "test")]
{
@@ -689,17 +802,14 @@ pub fn get_retry_after(
// we need to override the frequency for services which do not send back supported headers in their response.
if matches!(headers.get_optional::(), Ok(Some(mode)) if mode == RecordingMode::Playback)
{
- match duration {
- Some(duration) if duration > Duration::ZERO => {
- tracing::debug!(
- "overriding {}s poller retry in playback",
- duration.whole_seconds()
- );
- }
- _ => {}
+ if duration > Duration::ZERO {
+ tracing::debug!(
+ "overriding {}s poller retry in playback",
+ duration.whole_seconds()
+ );
}
- return Some(Duration::ZERO);
+ return Duration::ZERO;
}
}
@@ -814,7 +924,7 @@ mod tests {
};
let mut poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -829,7 +939,7 @@ mod tests {
match test_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
_ => Ok(PollerResult::Done { response }),
@@ -837,6 +947,7 @@ mod tests {
}
},
None,
+ None,
);
// First poll should succeed (201 Created with InProgress)
@@ -894,9 +1005,8 @@ mod tests {
.boxed()
}))
};
-
let mut poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -914,7 +1024,7 @@ mod tests {
match test_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
_ => Ok(PollerResult::Done { response }),
@@ -922,6 +1032,7 @@ mod tests {
}
},
None,
+ None,
);
// First poll should succeed (201 Created with InProgress)
@@ -981,7 +1092,7 @@ mod tests {
};
let mut poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1000,7 +1111,7 @@ mod tests {
match test_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
_ => Ok(PollerResult::Done { response }),
@@ -1014,6 +1125,7 @@ mod tests {
}
},
None,
+ None,
);
// First poll should succeed (200 OK with InProgress)
@@ -1071,7 +1183,7 @@ mod tests {
};
let poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1086,7 +1198,7 @@ mod tests {
match test_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
PollerStatus::Succeeded => {
@@ -1113,6 +1225,7 @@ mod tests {
}
},
None,
+ None,
);
// Use IntoFuture to await completion
@@ -1173,7 +1286,7 @@ mod tests {
};
let poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new(
@@ -1191,7 +1304,7 @@ mod tests {
match operation_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
PollerStatus::Succeeded => {
@@ -1233,6 +1346,7 @@ mod tests {
}
},
None,
+ None,
);
// Use IntoFuture to await completion
@@ -1295,7 +1409,7 @@ mod tests {
};
let poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1310,7 +1424,7 @@ mod tests {
match no_body_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
PollerStatus::Succeeded => {
@@ -1333,6 +1447,7 @@ mod tests {
}
},
None,
+ None,
);
// Use IntoFuture to await completion
@@ -1381,7 +1496,7 @@ mod tests {
};
let mut poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1396,7 +1511,7 @@ mod tests {
match test_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
_ => Ok(PollerResult::Done { response }),
@@ -1404,6 +1519,7 @@ mod tests {
}
},
None,
+ None,
);
// First poll should succeed (201 Created with InProgress)
@@ -1467,7 +1583,7 @@ mod tests {
};
let poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1482,7 +1598,7 @@ mod tests {
match test_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
PollerStatus::Succeeded => {
@@ -1507,6 +1623,7 @@ mod tests {
}
},
None,
+ None,
);
// Use IntoFuture to await completion
@@ -1575,7 +1692,7 @@ mod tests {
};
let poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1590,7 +1707,7 @@ mod tests {
match self_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
PollerStatus::Succeeded => {
@@ -1618,6 +1735,7 @@ mod tests {
}
},
None,
+ None,
);
// Use IntoFuture to await completion
@@ -1687,7 +1805,7 @@ mod tests {
};
let mut poller = Poller::from_callback(
- move |_| {
+ move |_, _, _| {
let client = mock_client.clone();
async move {
let req = Request::new("https://example.com".parse().unwrap(), Method::Get);
@@ -1702,7 +1820,7 @@ mod tests {
match self_status.status() {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
- retry_after: Some(Duration::ZERO),
+ retry_after: Duration::ZERO,
next: (),
}),
PollerStatus::Succeeded => {
@@ -1729,6 +1847,7 @@ mod tests {
}
},
None,
+ None,
);
// Use as a stream to monitor progress
diff --git a/sdk/core/azure_core_test/src/tracing.rs b/sdk/core/azure_core_test/src/tracing.rs
index c2e7f4dc6d..dc4296144f 100644
--- a/sdk/core/azure_core_test/src/tracing.rs
+++ b/sdk/core/azure_core_test/src/tracing.rs
@@ -334,6 +334,12 @@ fn check_span_information(
panic!("Expected attribute not found: {} = {:?}", key, value);
}
}
+ // Finally, ensure the span has been closed (`end()` was called).
+ assert!(
+ !*span.is_open.lock().unwrap(),
+ "Span {} was not ended",
+ span.name
+ );
}
/// Information about an instrumented API call.
diff --git a/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs b/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
index b65d97818e..b5d489e11b 100644
--- a/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
+++ b/sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
@@ -6,7 +6,7 @@ mod signature_target;
pub use authorization_policy::AuthorizationPolicy;
use azure_core::http::{
- pager::PagerState,
+ pager::{PagerOptions, PagerState},
request::{options::ContentType, Request},
response::Response,
ClientOptions, Context, Method, RawResponse, RetryOptions,
@@ -112,24 +112,26 @@ impl CosmosPipeline {
// First we clone the pipeline to pass it in to the closure
let pipeline = self.pipeline.clone();
let ctx = ctx.with_value(resource_link).into_owned();
- Ok(FeedPager::from_callback(move |continuation| {
- // Then we have to clone it again to pass it in to the async block.
- // This is because Pageable can't borrow any data, it has to own it all.
- // That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
- let pipeline = pipeline.clone();
- let mut req = base_request.clone();
- let ctx = ctx.clone();
- async move {
- if let PagerState::More(continuation) = continuation {
- req.insert_header(constants::CONTINUATION, continuation);
+ Ok(FeedPager::from_callback(
+ move |continuation, ctx| {
+ // Then we have to clone it again to pass it in to the async block.
+ // This is because Pageable can't borrow any data, it has to own it all.
+ // That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
+ let pipeline = pipeline.clone();
+ let mut req = base_request.clone();
+ async move {
+ if let PagerState::More(continuation) = continuation {
+ req.insert_header(constants::CONTINUATION, continuation);
+ }
+
+ let resp = pipeline.send(&ctx, &mut req, None).await?;
+ let page = FeedPage::::from_response(resp).await?;
+
+ Ok(page.into())
}
-
- let resp = pipeline.send(&ctx, &mut req, None).await?;
- let page = FeedPage::::from_response(resp).await?;
-
- Ok(page.into())
- }
- }))
+ },
+ Some(PagerOptions { context: ctx }),
+ ))
}
/// Helper function to read a throughput offer given a resource ID.
diff --git a/sdk/keyvault/assets.json b/sdk/keyvault/assets.json
index 3acda46933..b4cb587191 100644
--- a/sdk/keyvault/assets.json
+++ b/sdk/keyvault/assets.json
@@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "rust",
"TagPrefix": "rust/keyvault",
- "Tag": "rust/keyvault_bd60fd3a68"
+ "Tag": "rust/keyvault_af21e0c2ed"
}
diff --git a/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md b/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md
index fac9382a66..26a4f38445 100644
--- a/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md
+++ b/sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md
@@ -10,6 +10,7 @@
- Removed `CertificateClient::begin_create_certificate()`.
- Removed `CertificateClient::resume_create_certificate()`.
- Removed `wait()` function from `Poller`.
+- Changed `PollerOptions::frequency` from `Option` to `Duration`.
### Bugs Fixed
diff --git a/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs b/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs
index 094e602915..cc561923b3 100644
--- a/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs
+++ b/sdk/keyvault/azure_security_keyvault_certificates/src/clients.rs
@@ -9,15 +9,12 @@ use azure_core::{
error::ErrorKind,
http::{
headers::{RETRY_AFTER, RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS},
- policies::create_public_api_span,
poller::{
get_retry_after, Poller, PollerResult, PollerState, PollerStatus, StatusMonitor as _,
},
- Body, Method, RawResponse, Request, RequestContent, Url,
+ Body, Context, Method, RawResponse, Request, RequestContent, Url,
},
- json,
- tracing::{self, SpanStatus},
- Result,
+ json, tracing, Result,
};
impl CertificateClient {
@@ -76,7 +73,7 @@ impl CertificateClient {
/// ```
#[tracing::function("KeyVault.createCertificate")]
pub fn create_certificate(
- &self,
+ &'_ self,
certificate_name: &str,
parameters: RequestContent,
options: Option>,
@@ -95,14 +92,9 @@ impl CertificateClient {
let certificate_name = certificate_name.to_owned();
let parameters: Body = parameters.into();
- let mut ctx = options.method_options.context;
- let span = create_public_api_span(&ctx, None, None);
- if let Some(ref s) = span {
- ctx = ctx.with_value(s.clone());
- }
-
+ // let ctx = options.method_options.context;
Ok(Poller::from_callback(
- move |next_link: PollerState| {
+ move |next_link: PollerState, ctx: Context, poller_options| {
let (mut request, next_link) = match next_link {
PollerState::More(next_link) => {
// Make sure the `api-version` is set appropriately.
@@ -138,15 +130,13 @@ impl CertificateClient {
let pipeline = pipeline.clone();
let api_version = api_version.clone();
- let ctx = ctx.clone();
- let span = span.clone();
async move {
let rsp = pipeline.send(&ctx, &mut request, None).await?;
let (status, headers, body) = rsp.deconstruct();
let retry_after = get_retry_after(
&headers,
&[RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS, RETRY_AFTER],
- &options.poller_options,
+ &poller_options,
);
let res: CertificateOperation = json::from_json(&body)?;
let rsp = RawResponse::from_bytes(status, headers, body).into();
@@ -189,24 +179,6 @@ impl CertificateClient {
let rsp: RawResponse =
pipeline.send(&ctx, &mut request, None).await?;
let (status, headers, body) = rsp.deconstruct();
- if let Some(span) = span {
- // 5xx status codes SHOULD set status to Error.
- // The description should not be set because it can be inferred from "http.response.status_code".
- if status.is_server_error() {
- span.set_status(SpanStatus::Error {
- description: "".to_string(),
- });
- }
- if status.is_client_error() || status.is_server_error()
- {
- span.set_attribute(
- "error.type",
- status.to_string().into(),
- );
- }
-
- span.end();
- }
Ok(RawResponse::from_bytes(status, headers, body).into())
})
}),
@@ -216,7 +188,8 @@ impl CertificateClient {
})
}
},
- None,
+ Some(options.method_options),
+ Some(options.poller_options),
))
}
}
diff --git a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs
index 61363c960c..76ecccf760 100644
--- a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs
+++ b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs
@@ -31,7 +31,7 @@ use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
- pager::{PagerResult, PagerState},
+ pager::{PagerOptions, PagerResult, PagerState},
policies::{BearerTokenAuthorizationPolicy, Policy},
ClientOptions, Method, NoFormat, Pager, Pipeline, PipelineSendOptions, RawResponse,
Request, RequestContent, Response, Url,
@@ -689,51 +689,55 @@ impl CertificateClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListCertificatePropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListCertificatePropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// List the versions of a certificate.
@@ -772,51 +776,55 @@ impl CertificateClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListCertificatePropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListCertificatePropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// Lists the deleted certificates in the specified vault currently available for recovery.
@@ -851,51 +859,55 @@ impl CertificateClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListDeletedCertificatePropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListDeletedCertificatePropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// List certificate issuers for a specified key vault.
@@ -924,51 +936,55 @@ impl CertificateClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListIssuerPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListIssuerPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// Merges a certificate or a certificate chain with a key pair existing on the server.
diff --git a/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs b/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs
index e877d618eb..9f37275d9f 100644
--- a/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs
+++ b/sdk/keyvault/azure_security_keyvault_keys/src/generated/clients/key_client.rs
@@ -25,7 +25,7 @@ use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
- pager::{PagerResult, PagerState},
+ pager::{PagerOptions, PagerResult, PagerState},
policies::{BearerTokenAuthorizationPolicy, Policy},
ClientOptions, Method, NoFormat, Pager, Pipeline, PipelineSendOptions, RawResponse,
Request, RequestContent, Response, Url,
@@ -686,51 +686,55 @@ impl KeyClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListDeletedKeyPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListDeletedKeyPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// List keys in the specified vault.
@@ -760,51 +764,55 @@ impl KeyClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListKeyPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListKeyPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// Retrieves a list of individual key versions with the same key name.
@@ -842,51 +850,55 @@ impl KeyClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListKeyPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListKeyPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// Permanently deletes the specified key.
diff --git a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs
index 639d742627..9f7a3313ce 100644
--- a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs
+++ b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs
@@ -19,7 +19,7 @@ use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
- pager::{PagerResult, PagerState},
+ pager::{PagerOptions, PagerResult, PagerState},
policies::{BearerTokenAuthorizationPolicy, Policy},
ClientOptions, Method, NoFormat, Pager, Pipeline, PipelineSendOptions, RawResponse,
Request, RequestContent, Response, Url,
@@ -312,51 +312,55 @@ impl SecretClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListDeletedSecretPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListDeletedSecretPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// List secrets in a specified key vault.
@@ -386,51 +390,55 @@ impl SecretClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListSecretPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListSecretPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// List all versions of the specified secret.
@@ -469,51 +477,55 @@ impl SecretClient {
.append_pair("maxresults", &maxresults.to_string());
}
let api_version = self.api_version.clone();
- Ok(Pager::from_callback(move |next_link: PagerState| {
- let url = match next_link {
- PagerState::More(next_link) => {
- let qp = next_link
- .query_pairs()
- .filter(|(name, _)| name.ne("api-version"));
- let mut next_link = next_link.clone();
- next_link
- .query_pairs_mut()
- .clear()
- .extend_pairs(qp)
- .append_pair("api-version", &api_version);
- next_link
+ Ok(Pager::from_callback(
+ move |next_link: PagerState, ctx| {
+ let url = match next_link {
+ PagerState::More(next_link) => {
+ let qp = next_link
+ .query_pairs()
+ .filter(|(name, _)| name.ne("api-version"));
+ let mut next_link = next_link.clone();
+ next_link
+ .query_pairs_mut()
+ .clear()
+ .extend_pairs(qp)
+ .append_pair("api-version", &api_version);
+ next_link
+ }
+ PagerState::Initial => first_url.clone(),
+ };
+ let mut request = Request::new(url, Method::Get);
+ request.insert_header("accept", "application/json");
+ let pipeline = pipeline.clone();
+ async move {
+ let rsp = pipeline
+ .send(
+ &ctx,
+ &mut request,
+ Some(PipelineSendOptions {
+ check_success: CheckSuccessOptions {
+ success_codes: &[200],
+ },
+ ..Default::default()
+ }),
+ )
+ .await?;
+ let (status, headers, body) = rsp.deconstruct();
+ let res: ListSecretPropertiesResult = json::from_json(&body)?;
+ let rsp = RawResponse::from_bytes(status, headers, body).into();
+ Ok(match res.next_link {
+ Some(next_link) if !next_link.is_empty() => PagerResult::More {
+ response: rsp,
+ continuation: next_link.parse()?,
+ },
+ _ => PagerResult::Done { response: rsp },
+ })
}
- PagerState::Initial => first_url.clone(),
- };
- let mut request = Request::new(url, Method::Get);
- request.insert_header("accept", "application/json");
- let ctx = options.method_options.context.clone();
- let pipeline = pipeline.clone();
- async move {
- let rsp = pipeline
- .send(
- &ctx,
- &mut request,
- Some(PipelineSendOptions {
- check_success: CheckSuccessOptions {
- success_codes: &[200],
- },
- ..Default::default()
- }),
- )
- .await?;
- let (status, headers, body) = rsp.deconstruct();
- let res: ListSecretPropertiesResult = json::from_json(&body)?;
- let rsp = RawResponse::from_bytes(status, headers, body).into();
- Ok(match res.next_link {
- Some(next_link) if !next_link.is_empty() => PagerResult::More {
- response: rsp,
- continuation: next_link.parse()?,
- },
- _ => PagerResult::Done { response: rsp },
- })
- }
- }))
+ },
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
+ ))
}
/// Permanently deletes the specified secret.
diff --git a/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs b/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs
index 7226234fe5..d25842aeca 100644
--- a/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs
+++ b/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs
@@ -290,3 +290,218 @@ async fn round_trip_secret_verify_telemetry(ctx: TestContext) -> Result<()> {
Ok(())
}
+
+#[recorded::test]
+async fn list_secrets_verify_telemetry(ctx: TestContext) -> Result<()> {
+ use azure_core_test::tracing::ExpectedRestApiSpan;
+
+ const SECRET_COUNT: usize = 50;
+
+ let recording = ctx.recording();
+
+ {
+ let secret_client = {
+ let mut options = SecretClientOptions::default();
+ recording.instrument(&mut options.client_options);
+ SecretClient::new(
+ recording.var("AZURE_KEYVAULT_URL", None).as_str(),
+ recording.credential(),
+ Some(options),
+ )
+ }?;
+ for i in 1..=SECRET_COUNT {
+ let secret = secret_client
+ .set_secret(
+ &format!("secret-list-telemetry-{}", i),
+ SetSecretParameters {
+ value: Some(format!("secret-list-telemetry-value-{}", i)),
+ ..Default::default()
+ }
+ .try_into()?,
+ None,
+ )
+ .await?
+ .into_body()?;
+ assert_eq!(
+ secret.value,
+ Some(format!("secret-list-telemetry-value-{}", i))
+ );
+ }
+ }
+ // Verify that the distributed tracing traces generated from the API call below match the expected traces.
+ let validate_result = azure_core_test::tracing::assert_instrumentation_information(
+ |tracer_provider| {
+ let mut options = SecretClientOptions::default();
+ recording.instrument(&mut options.client_options);
+ options.client_options.instrumentation = InstrumentationOptions {
+ tracer_provider: Some(tracer_provider),
+ };
+ SecretClient::new(
+ recording.var("AZURE_KEYVAULT_URL", None).as_str(),
+ recording.credential(),
+ Some(options),
+ )
+ },
+ |client: SecretClient| {
+ Box::pin(async move {
+ let mut secrets = client.list_secret_properties(None)?;
+ while let Some(secret) = secrets.try_next().await? {
+ let _ = secret.resource_id()?;
+ }
+
+ Ok(())
+ })
+ },
+ ExpectedInstrumentation {
+ package_name: recording.var("CARGO_PKG_NAME", None),
+ package_version: recording.var("CARGO_PKG_VERSION", None),
+ package_namespace: Some("KeyVault"),
+ api_calls: vec![ExpectedApiInformation {
+ api_name: Some("KeyVault.getSecrets"),
+ api_children: vec![
+ ExpectedRestApiSpan {
+ api_verb: azure_core::http::Method::Get,
+ ..Default::default()
+ },
+ ExpectedRestApiSpan {
+ api_verb: azure_core::http::Method::Get,
+ ..Default::default()
+ },
+ ExpectedRestApiSpan {
+ api_verb: azure_core::http::Method::Get,
+ ..Default::default()
+ },
+ ExpectedRestApiSpan {
+ api_verb: azure_core::http::Method::Get,
+ ..Default::default()
+ },
+ ExpectedRestApiSpan {
+ api_verb: azure_core::http::Method::Get,
+ ..Default::default()
+ },
+ ],
+ ..Default::default()
+ }],
+ },
+ )
+ .await;
+
+ validate_result
+}
+
+// Commented out for now until ItemIterator->PageIterator works for continuation tokens.
+
+// #[recorded::test]
+// async fn list_secrets_verify_telemetry_rehydrated(ctx: TestContext) -> Result<()> {
+// use azure_core_test::tracing::ExpectedRestApiSpan;
+
+// const SECRET_COUNT: usize = 50;
+
+// let recording = ctx.recording();
+
+// {
+// let secret_client = {
+// let mut options = SecretClientOptions::default();
+// recording.instrument(&mut options.client_options);
+// SecretClient::new(
+// recording.var("AZURE_KEYVAULT_URL", None).as_str(),
+// recording.credential(),
+// Some(options),
+// )
+// }?;
+// for i in 1..=SECRET_COUNT {
+// let secret = secret_client
+// .set_secret(
+// &format!("secret-rehydrate-telemetry-{}", i),
+// SetSecretParameters {
+// value: Some(format!("secret-rehydrate-telemetry-value-{}", i)),
+// ..Default::default()
+// }
+// .try_into()?,
+// None,
+// )
+// .await?
+// .into_body()?;
+// assert_eq!(
+// secret.value,
+// Some(format!("secret-rehydrate-telemetry-value-{}", i))
+// );
+// }
+// }
+// // Verify that the distributed tracing traces generated from the API call below match the expected traces.
+// let validate_result = azure_core_test::tracing::assert_instrumentation_information(
+// |tracer_provider| {
+// let mut options = SecretClientOptions::default();
+// recording.instrument(&mut options.client_options);
+// options.client_options.instrumentation = InstrumentationOptions {
+// tracer_provider: Some(tracer_provider),
+// };
+// SecretClient::new(
+// recording.var("AZURE_KEYVAULT_URL", None).as_str(),
+// recording.credential(),
+// Some(options),
+// )
+// },
+// |client: SecretClient| {
+// Box::pin(async move {
+// let mut first_pager = client.list_secret_properties(None)?.into_pages();
+
+// // Prime the iteration.
+// let first_page = first_pager
+// .try_next()
+// .await?
+// .expect("expected at least one page");
+// {
+// let secrets = first_page.into_body()?;
+// for secret in secrets.value {
+// let _ = secret.resource_id()?;
+// }
+// }
+
+// let rehydration_token = first_pager
+// .continuation_token()
+// .expect("expected continuation token to be created after first page");
+
+// let mut rehydrated_pager = client
+// .list_secret_properties(None)?
+// .into_pages()
+// .with_continuation_token(rehydration_token);
+
+// while let Some(secret_page) = rehydrated_pager.try_next().await? {
+// let secrets = secret_page.into_body()?;
+// for secret in secrets.value {
+// let _ = secret.resource_id()?;
+// }
+// }
+
+// Ok(())
+// })
+// },
+// ExpectedInstrumentation {
+// package_name: recording.var("CARGO_PKG_NAME", None),
+// package_version: recording.var("CARGO_PKG_VERSION", None),
+// package_namespace: Some("KeyVault"),
+// api_calls: vec![ExpectedApiInformation {
+// api_name: Some("KeyVault.getSecrets"),
+// api_children: vec![
+// ExpectedRestApiSpan {
+// api_verb: azure_core::http::Method::Get,
+// ..Default::default()
+// },
+// ExpectedRestApiSpan {
+// api_verb: azure_core::http::Method::Get,
+// ..Default::default()
+// },
+// ExpectedRestApiSpan {
+// api_verb: azure_core::http::Method::Get,
+// ..Default::default()
+// },
+// ],
+// ..Default::default()
+// }],
+// },
+// )
+// .await;
+
+// validate_result
+// }
diff --git a/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs b/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs
index 455159d393..39d976729d 100644
--- a/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs
+++ b/sdk/storage/azure_storage_blob/src/generated/clients/blob_container_client.rs
@@ -26,7 +26,7 @@ use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
- pager::{PagerResult, PagerState},
+ pager::{PagerOptions, PagerResult, PagerState},
policies::{BearerTokenAuthorizationPolicy, Policy},
ClientOptions, Method, NoFormat, PageIterator, Pipeline, PipelineSendOptions, RawResponse,
Request, RequestContent, Response, Url, XmlFormat,
@@ -837,7 +837,7 @@ impl BlobContainerClient {
}
let version = self.version.clone();
Ok(PageIterator::from_callback(
- move |marker: PagerState| {
+ move |marker: PagerState, ctx| {
let mut url = first_url.clone();
if let PagerState::More(marker) = marker {
if url.query_pairs().any(|(name, _)| name.eq("marker")) {
@@ -857,7 +857,6 @@ impl BlobContainerClient {
request.insert_header("x-ms-client-request-id", client_request_id);
}
request.insert_header("x-ms-version", &version);
- let ctx = options.method_options.context.clone();
let pipeline = pipeline.clone();
async move {
let rsp = pipeline
@@ -884,6 +883,9 @@ impl BlobContainerClient {
})
}
},
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
))
}
@@ -964,7 +966,7 @@ impl BlobContainerClient {
}
let version = self.version.clone();
Ok(PageIterator::from_callback(
- move |marker: PagerState| {
+ move |marker: PagerState, ctx| {
let mut url = first_url.clone();
if let PagerState::More(marker) = marker {
if url.query_pairs().any(|(name, _)| name.eq("marker")) {
@@ -984,7 +986,6 @@ impl BlobContainerClient {
request.insert_header("x-ms-client-request-id", client_request_id);
}
request.insert_header("x-ms-version", &version);
- let ctx = options.method_options.context.clone();
let pipeline = pipeline.clone();
async move {
let rsp = pipeline
@@ -1011,6 +1012,9 @@ impl BlobContainerClient {
})
}
},
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
))
}
diff --git a/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs b/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs
index 27278e50ba..850c80cbfa 100644
--- a/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs
+++ b/sdk/storage/azure_storage_blob/src/generated/clients/blob_service_client.rs
@@ -16,7 +16,7 @@ use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
- pager::{PagerResult, PagerState},
+ pager::{PagerOptions, PagerResult, PagerState},
policies::{BearerTokenAuthorizationPolicy, Policy},
ClientOptions, Method, NoFormat, PageIterator, Pipeline, PipelineSendOptions, RawResponse,
Request, RequestContent, Response, Url, XmlFormat,
@@ -445,7 +445,7 @@ impl BlobServiceClient {
}
let version = self.version.clone();
Ok(PageIterator::from_callback(
- move |marker: PagerState| {
+ move |marker: PagerState, ctx| {
let mut url = first_url.clone();
if let PagerState::More(marker) = marker {
if url.query_pairs().any(|(name, _)| name.eq("marker")) {
@@ -465,7 +465,6 @@ impl BlobServiceClient {
request.insert_header("x-ms-client-request-id", client_request_id);
}
request.insert_header("x-ms-version", &version);
- let ctx = options.method_options.context.clone();
let pipeline = pipeline.clone();
async move {
let rsp = pipeline
@@ -492,6 +491,9 @@ impl BlobServiceClient {
})
}
},
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
))
}
diff --git a/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs b/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs
index f4d442dba6..46a522c025 100644
--- a/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs
+++ b/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs
@@ -16,7 +16,7 @@ use azure_core::{
error::CheckSuccessOptions,
fmt::SafeDebug,
http::{
- pager::{PagerResult, PagerState},
+ pager::{PagerOptions, PagerResult, PagerState},
policies::{BearerTokenAuthorizationPolicy, Policy},
ClientOptions, Method, NoFormat, PageIterator, Pipeline, PipelineSendOptions, RawResponse,
Request, RequestContent, Response, Url, XmlFormat,
@@ -255,7 +255,7 @@ impl QueueServiceClient {
}
let version = self.version.clone();
Ok(PageIterator::from_callback(
- move |marker: PagerState| {
+ move |marker: PagerState, ctx| {
let mut url = first_url.clone();
if let PagerState::More(marker) = marker {
if url.query_pairs().any(|(name, _)| name.eq("marker")) {
@@ -275,7 +275,6 @@ impl QueueServiceClient {
request.insert_header("x-ms-client-request-id", client_request_id);
}
request.insert_header("x-ms-version", &version);
- let ctx = options.method_options.context.clone();
let pipeline = pipeline.clone();
async move {
let rsp = pipeline
@@ -302,6 +301,9 @@ impl QueueServiceClient {
})
}
},
+ Some(PagerOptions {
+ context: options.method_options.context.clone(),
+ }),
))
}