Skip to content

Commit 395caec

Browse files
Merge pull request #145 from SuperTux88/partial-pull
Partial Requests and some other pull refactorings
2 parents 4c700ef + f408f86 commit 395caec

File tree

1 file changed

+92
-53
lines changed

1 file changed

+92
-53
lines changed

src/client.rs

Lines changed: 92 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ use crate::Reference;
1919
use crate::errors::{OciDistributionError, Result};
2020
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
2121
use futures_util::future;
22-
use futures_util::stream::{self, StreamExt, TryStreamExt};
23-
use futures_util::Stream;
24-
use http::HeaderValue;
22+
use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
23+
use http::header::RANGE;
24+
use http::{HeaderValue, StatusCode};
2525
use http_auth::{parser::ChallengeParser, ChallengeRef};
2626
use olpc_cjson::CanonicalFormatter;
2727
use reqwest::header::HeaderMap;
28-
use reqwest::{RequestBuilder, Url};
28+
use reqwest::{RequestBuilder, Response, Url};
2929
use serde::Deserialize;
3030
use serde::Serialize;
3131
use sha2::Digest;
@@ -173,6 +173,22 @@ impl ImageLayer {
173173
}
174174
}
175175

176+
/// Stream response of a blob with optional content length if available
177+
pub struct SizedStream {
178+
/// The length of the stream if the upstream registry sent a `Content-Length` header
179+
pub content_length: Option<u64>,
180+
/// The stream of bytes
181+
pub stream: BoxStream<'static, std::result::Result<bytes::Bytes, std::io::Error>>,
182+
}
183+
184+
/// The response of a partial blob request
185+
pub enum BlobResponse {
186+
/// The response is a full blob (for example when partial requests aren't supported)
187+
Full(SizedStream),
188+
/// The response is a partial blob as requested
189+
Partial(SizedStream),
190+
}
191+
176192
/// The data and media type for a configuration object
177193
#[derive(Clone)]
178194
pub struct Config {
@@ -1021,39 +1037,7 @@ impl Client {
10211037
layer: impl AsLayerDescriptor,
10221038
mut out: T,
10231039
) -> Result<()> {
1024-
let layer = layer.as_layer_descriptor();
1025-
let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), layer.digest);
1026-
1027-
let mut response = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1028-
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1029-
.apply_auth(image, RegistryOperation::Pull)
1030-
.await?
1031-
.into_request_builder()
1032-
.send()
1033-
.await?;
1034-
1035-
if let Some(urls) = &layer.urls {
1036-
for url in urls {
1037-
if response.error_for_status_ref().is_ok() {
1038-
break;
1039-
}
1040-
1041-
let url = Url::parse(url)
1042-
.map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?;
1043-
1044-
if url.scheme() == "http" || url.scheme() == "https" {
1045-
// NOTE: we must not authenticate on additional URLs as those
1046-
// can be abused to leak credentials or tokens. Please
1047-
// refer to CVE-2020-15157 for more information.
1048-
response =
1049-
RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
1050-
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1051-
.into_request_builder()
1052-
.send()
1053-
.await?
1054-
}
1055-
}
1056-
}
1040+
let response = self.pull_blob_response(image, layer, None).await?;
10571041

10581042
let mut stream = response.error_for_status()?.bytes_stream();
10591043

@@ -1072,17 +1056,58 @@ impl Client {
10721056
&self,
10731057
image: &Reference,
10741058
layer: impl AsLayerDescriptor,
1075-
) -> Result<impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>> {
1059+
) -> Result<SizedStream> {
1060+
stream_from_response(self.pull_blob_response(image, layer, None).await?)
1061+
}
1062+
1063+
/// Stream a single layer from an OCI registry starting with a byte offset.
1064+
/// This can be used to continue downloading a layer after a network error.
1065+
///
1066+
/// Returns [`Stream`](futures_util::Stream).
1067+
pub async fn pull_blob_stream_partial(
1068+
&self,
1069+
image: &Reference,
1070+
layer: impl AsLayerDescriptor,
1071+
offset: u64,
1072+
) -> Result<BlobResponse> {
1073+
let response = self.pull_blob_response(image, layer, Some(offset)).await?;
1074+
1075+
let status = response.status();
1076+
match status {
1077+
StatusCode::OK => Ok(BlobResponse::Full(stream_from_response(response)?)),
1078+
StatusCode::PARTIAL_CONTENT => {
1079+
Ok(BlobResponse::Partial(stream_from_response(response)?))
1080+
}
1081+
_ => Err(OciDistributionError::ServerError {
1082+
code: status.as_u16(),
1083+
url: response.url().to_string(),
1084+
message: response.text().await?,
1085+
}),
1086+
}
1087+
}
1088+
1089+
/// Pull a single layer from an OCI registry.
1090+
async fn pull_blob_response(
1091+
&self,
1092+
image: &Reference,
1093+
layer: impl AsLayerDescriptor,
1094+
offset: Option<u64>,
1095+
) -> Result<Response> {
10761096
let layer = layer.as_layer_descriptor();
10771097
let url = self.to_v2_blob_url(image.resolve_registry(), image.repository(), layer.digest);
10781098

1079-
let mut response = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
1099+
let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url))
10801100
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
10811101
.apply_auth(image, RegistryOperation::Pull)
10821102
.await?
1083-
.into_request_builder()
1084-
.send()
1085-
.await?;
1103+
.into_request_builder();
1104+
if let Some(offset) = offset {
1105+
request = request.header(
1106+
RANGE,
1107+
HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1108+
);
1109+
}
1110+
let mut response = request.send().await?;
10861111

10871112
if let Some(urls) = &layer.urls {
10881113
for url in urls {
@@ -1097,22 +1122,22 @@ impl Client {
10971122
// NOTE: we must not authenticate on additional URLs as those
10981123
// can be abused to leak credentials or tokens. Please
10991124
// refer to CVE-2020-15157 for more information.
1100-
response =
1125+
request =
11011126
RequestBuilderWrapper::from_client(self, |client| client.get(url.clone()))
11021127
.apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)?
1103-
.into_request_builder()
1104-
.send()
1105-
.await?
1128+
.into_request_builder();
1129+
if let Some(offset) = offset {
1130+
request = request.header(
1131+
RANGE,
1132+
HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(),
1133+
);
1134+
}
1135+
response = request.send().await?
11061136
}
11071137
}
11081138
}
11091139

1110-
let stream = response
1111-
.error_for_status()?
1112-
.bytes_stream()
1113-
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1114-
1115-
Ok(stream)
1140+
Ok(response)
11161141
}
11171142

11181143
/// Begins a session to push an image to registry in a monolithical way
@@ -1511,6 +1536,19 @@ fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &st
15111536
}
15121537
}
15131538

1539+
/// Converts a response into a stream
1540+
fn stream_from_response(response: Response) -> Result<SizedStream> {
1541+
let content_length = response.content_length();
1542+
let stream = response
1543+
.error_for_status()?
1544+
.bytes_stream()
1545+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
1546+
Ok(SizedStream {
1547+
content_length,
1548+
stream: Box::pin(stream),
1549+
})
1550+
}
1551+
15141552
/// The request builder wrapper allows to be instantiated from a
15151553
/// `Client` and allows composable operations on the request builder,
15161554
/// to produce a `RequestBuilder` object that can be executed.
@@ -2567,7 +2605,8 @@ mod test {
25672605
.await
25682606
.expect("failed to pull blob stream");
25692607

2570-
AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream), &mut file)
2608+
assert_eq!(layer_stream.content_length, Some(layer0.size as u64));
2609+
AsyncReadExt::read_to_end(&mut StreamReader::new(layer_stream.stream), &mut file)
25712610
.await
25722611
.unwrap();
25732612

0 commit comments

Comments
 (0)