Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ vcpkg_installed/

# Editor user customizations.
.vscode/launch.json
.vscode/mcp.json
.idea/

# Editor temp files.
Expand Down
314 changes: 209 additions & 105 deletions sdk/core/azure_core/src/http/pager.rs

Large diffs are not rendered by default.

271 changes: 195 additions & 76 deletions sdk/core/azure_core/src/http/poller.rs

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions sdk/core/azure_core_test/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ fn check_span_information(
panic!("Expected attribute not found: {} = {:?}", key, value);
}
}
// Finally, ensure the span has been closed (`end()` was called).
assert!(
!*span.is_open.lock().unwrap(),
"Span {} was not ended",
span.name
);
}

/// Information about an instrumented API call.
Expand Down
38 changes: 20 additions & 18 deletions sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod signature_target;

pub use authorization_policy::AuthorizationPolicy;
use azure_core::http::{
pager::PagerState,
pager::{PagerOptions, PagerState},
request::{options::ContentType, Request},
response::Response,
ClientOptions, Context, Method, RawResponse, RetryOptions,
Expand Down Expand Up @@ -112,24 +112,26 @@ impl CosmosPipeline {
// First we clone the pipeline to pass it in to the closure
let pipeline = self.pipeline.clone();
let ctx = ctx.with_value(resource_link).into_owned();
Ok(FeedPager::from_callback(move |continuation| {
// Then we have to clone it again to pass it in to the async block.
// This is because Pageable can't borrow any data, it has to own it all.
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
let pipeline = pipeline.clone();
let mut req = base_request.clone();
let ctx = ctx.clone();
async move {
if let PagerState::More(continuation) = continuation {
req.insert_header(constants::CONTINUATION, continuation);
Ok(FeedPager::from_callback(
move |continuation, ctx| {
// Then we have to clone it again to pass it in to the async block.
// This is because Pageable can't borrow any data, it has to own it all.
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
let pipeline = pipeline.clone();
let mut req = base_request.clone();
async move {
if let PagerState::More(continuation) = continuation {
req.insert_header(constants::CONTINUATION, continuation);
}

let resp = pipeline.send(&ctx, &mut req, None).await?;
let page = FeedPage::<T>::from_response(resp).await?;

Ok(page.into())
}

let resp = pipeline.send(&ctx, &mut req, None).await?;
let page = FeedPage::<T>::from_response(resp).await?;

Ok(page.into())
}
}))
},
Some(PagerOptions { context: ctx }),
))
}

/// Helper function to read a throughput offer given a resource ID.
Expand Down
2 changes: 1 addition & 1 deletion sdk/keyvault/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "rust",
"TagPrefix": "rust/keyvault",
"Tag": "rust/keyvault_bd60fd3a68"
"Tag": "rust/keyvault_af21e0c2ed"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Removed `CertificateClient::begin_create_certificate()`.
- Removed `CertificateClient::resume_create_certificate()`.
- Removed `wait()` function from `Poller<CertificateOperation>`.
- Changed `PollerOptions::frequency` from `Option<Duration>` to `Duration`.

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ use azure_core::{
error::ErrorKind,
http::{
headers::{RETRY_AFTER, RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS},
policies::create_public_api_span,
poller::{
get_retry_after, Poller, PollerResult, PollerState, PollerStatus, StatusMonitor as _,
},
Body, Method, RawResponse, Request, RequestContent, Url,
Body, Context, Method, RawResponse, Request, RequestContent, Url,
},
json,
tracing::{self, SpanStatus},
Result,
json, tracing, Result,
};

impl CertificateClient {
Expand Down Expand Up @@ -76,7 +73,7 @@ impl CertificateClient {
/// ```
#[tracing::function("KeyVault.createCertificate")]
pub fn create_certificate(
&self,
&'_ self,
certificate_name: &str,
parameters: RequestContent<CreateCertificateParameters>,
options: Option<CertificateClientCreateCertificateOptions<'_>>,
Expand All @@ -95,14 +92,9 @@ impl CertificateClient {
let certificate_name = certificate_name.to_owned();
let parameters: Body = parameters.into();

let mut ctx = options.method_options.context;
let span = create_public_api_span(&ctx, None, None);
if let Some(ref s) = span {
ctx = ctx.with_value(s.clone());
}

// let ctx = options.method_options.context;
Ok(Poller::from_callback(
move |next_link: PollerState<Url>| {
move |next_link: PollerState<Url>, ctx: Context, poller_options| {
let (mut request, next_link) = match next_link {
PollerState::More(next_link) => {
// Make sure the `api-version` is set appropriately.
Expand Down Expand Up @@ -138,15 +130,13 @@ impl CertificateClient {

let pipeline = pipeline.clone();
let api_version = api_version.clone();
let ctx = ctx.clone();
let span = span.clone();
async move {
let rsp = pipeline.send(&ctx, &mut request, None).await?;
let (status, headers, body) = rsp.deconstruct();
let retry_after = get_retry_after(
&headers,
&[RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS, RETRY_AFTER],
&options.poller_options,
&poller_options,
);
let res: CertificateOperation = json::from_json(&body)?;
let rsp = RawResponse::from_bytes(status, headers, body).into();
Expand Down Expand Up @@ -189,24 +179,6 @@ impl CertificateClient {
let rsp: RawResponse =
pipeline.send(&ctx, &mut request, None).await?;
let (status, headers, body) = rsp.deconstruct();
if let Some(span) = span {
// 5xx status codes SHOULD set status to Error.
// The description should not be set because it can be inferred from "http.response.status_code".
if status.is_server_error() {
span.set_status(SpanStatus::Error {
description: "".to_string(),
});
}
if status.is_client_error() || status.is_server_error()
{
span.set_attribute(
"error.type",
status.to_string().into(),
);
}

span.end();
}
Ok(RawResponse::from_bytes(status, headers, body).into())
})
}),
Expand All @@ -216,7 +188,8 @@ impl CertificateClient {
})
}
},
None,
Some(options.method_options),
Some(options.poller_options),
))
}
}
Loading