Skip to content

Commit a0dd43f

Browse files
committed
Consolidate backends with their invokers
1 parent 8c32043 commit a0dd43f

File tree

11 files changed

+202
-207
lines changed

11 files changed

+202
-207
lines changed

Cargo.toml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@ categories = ["science::geo"]
1212

1313
[features]
1414
default = []
15-
http-async = ["dep:tokio", "dep:reqwest"]
16-
s3-async-native = ["dep:tokio", "dep:rust-s3", "rust-s3?/tokio-native-tls"]
17-
s3-async-rustls = ["dep:tokio", "dep:rust-s3", "rust-s3?/tokio-rustls-tls"]
18-
mmap-async-tokio = ["dep:tokio", "dep:fmmap", "fmmap?/tokio-async", "async-compression?/tokio"]
15+
http-async = ["__async", "dep:reqwest"]
16+
mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"]
17+
s3-async-native = ["__async-s3"]
18+
s3-async-rustls = ["__async-s3"]
1919
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]
2020

21+
# Internal features, do not use
22+
__async = ["dep:tokio", "async-compression/tokio"]
23+
__async-s3 = ["__async", "dep:rust-s3", "rust-s3?/tokio-native-tls"]
24+
2125
# TODO: support other async libraries
2226

2327
[dependencies]
@@ -29,13 +33,13 @@ bytes = "1"
2933
fmmap = { version = "0.3", default-features = false, optional = true }
3034
hilbert_2d = "1"
3135
reqwest = { version = "0.11", default-features = false, optional = true }
36+
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }
3237
serde = { version = "1", optional = true }
3338
serde_json = { version = "1", optional = true }
3439
thiserror = "1"
3540
tilejson = { version = "0.4", optional = true }
3641
tokio = { version = "1", default-features = false, features = ["io-util"], optional = true }
3742
varint-rs = "2"
38-
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }
3943

4044
[dev-dependencies]
4145
flate2 = "1"

src/async_reader.rs

Lines changed: 3 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,14 @@
22
// so any file larger than 4GB, or an untrusted file with bad data may crash.
33
#![allow(clippy::cast_possible_truncation)]
44

5-
#[cfg(feature = "mmap-async-tokio")]
6-
use std::path::Path;
7-
85
use async_recursion::async_recursion;
96
use async_trait::async_trait;
107
use bytes::Bytes;
11-
#[cfg(feature = "http-async")]
12-
use reqwest::{Client, IntoUrl};
13-
#[cfg(any(
14-
feature = "http-async",
15-
feature = "mmap-async-tokio",
16-
feature = "s3-async-rustls",
17-
feature = "s3-async-native"
18-
))]
8+
#[cfg(feature = "__async")]
199
use tokio::io::AsyncReadExt;
2010

21-
#[cfg(feature = "http-async")]
22-
use crate::backend::HttpBackend;
23-
#[cfg(feature = "mmap-async-tokio")]
24-
use crate::backend::MmapBackend;
25-
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
26-
use crate::backend::S3Backend;
2711
use crate::cache::DirCacheResult;
28-
#[cfg(any(
29-
feature = "http-async",
30-
feature = "mmap-async-tokio",
31-
feature = "s3-async-native",
32-
feature = "s3-async-rustls"
33-
))]
12+
#[cfg(feature = "__async")]
3413
use crate::cache::{DirectoryCache, NoCache};
3514
use crate::directory::{DirEntry, Directory};
3615
use crate::error::{PmtError, PmtResult};
@@ -227,80 +206,6 @@ impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTile
227206
}
228207
}
229208

230-
#[cfg(feature = "http-async")]
231-
impl AsyncPmTilesReader<HttpBackend, NoCache> {
232-
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
233-
///
234-
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
235-
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> PmtResult<Self> {
236-
Self::new_with_cached_url(NoCache, client, url).await
237-
}
238-
}
239-
240-
#[cfg(feature = "http-async")]
241-
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
242-
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
243-
///
244-
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
245-
pub async fn new_with_cached_url<U: IntoUrl>(
246-
cache: C,
247-
client: Client,
248-
url: U,
249-
) -> PmtResult<Self> {
250-
let backend = HttpBackend::try_from(client, url)?;
251-
252-
Self::try_from_cached_source(backend, cache).await
253-
}
254-
}
255-
256-
#[cfg(feature = "mmap-async-tokio")]
257-
impl AsyncPmTilesReader<MmapBackend, NoCache> {
258-
/// Creates a new `PMTiles` reader from a file path using the async mmap backend.
259-
///
260-
/// Fails if [p] does not exist or is an invalid archive.
261-
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> PmtResult<Self> {
262-
Self::new_with_cached_path(NoCache, path).await
263-
}
264-
}
265-
266-
#[cfg(feature = "mmap-async-tokio")]
267-
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
268-
/// Creates a new cached `PMTiles` reader from a file path using the async mmap backend.
269-
///
270-
/// Fails if [p] does not exist or is an invalid archive.
271-
pub async fn new_with_cached_path<P: AsRef<Path>>(cache: C, path: P) -> PmtResult<Self> {
272-
let backend = MmapBackend::try_from(path).await?;
273-
274-
Self::try_from_cached_source(backend, cache).await
275-
}
276-
}
277-
278-
#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
279-
impl AsyncPmTilesReader<S3Backend, NoCache> {
280-
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
281-
///
282-
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
283-
pub async fn new_with_bucket_path(bucket: s3::Bucket, path: String) -> PmtResult<Self> {
284-
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
285-
}
286-
}
287-
288-
#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
289-
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
290-
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
291-
///
292-
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
293-
pub async fn new_with_cached_bucket_path(
294-
cache: C,
295-
bucket: s3::Bucket,
296-
path: String,
297-
) -> PmtResult<Self> {
298-
let backend = S3Backend::from(bucket, path);
299-
300-
Self::try_from_cached_source(backend, cache).await
301-
}
302-
}
303-
304209
#[async_trait]
305210
pub trait AsyncBackend {
306211
/// Reads exactly `length` bytes starting at `offset`
@@ -314,8 +219,8 @@ pub trait AsyncBackend {
314219
#[cfg(feature = "mmap-async-tokio")]
315220
mod tests {
316221
use super::AsyncPmTilesReader;
317-
use crate::backend::MmapBackend;
318222
use crate::tests::{RASTER_FILE, VECTOR_FILE};
223+
use crate::MmapBackend;
319224

320225
#[tokio::test]
321226
async fn open_sanity_check() {

src/backend/mod.rs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/backend/s3.rs

Lines changed: 0 additions & 50 deletions
This file was deleted.

src/backend/http.rs renamed to src/backend_http.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,35 @@ use bytes::Bytes;
33
use reqwest::header::{HeaderValue, RANGE};
44
use reqwest::{Client, IntoUrl, Method, Request, StatusCode, Url};
55

6-
use crate::async_reader::AsyncBackend;
6+
use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
7+
use crate::cache::{DirectoryCache, NoCache};
78
use crate::error::PmtResult;
89
use crate::PmtError;
910

11+
impl AsyncPmTilesReader<HttpBackend, NoCache> {
12+
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
13+
///
14+
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
15+
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> PmtResult<Self> {
16+
Self::new_with_cached_url(NoCache, client, url).await
17+
}
18+
}
19+
20+
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
21+
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
22+
///
23+
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
24+
pub async fn new_with_cached_url<U: IntoUrl>(
25+
cache: C,
26+
client: Client,
27+
url: U,
28+
) -> PmtResult<Self> {
29+
let backend = HttpBackend::try_from(client, url)?;
30+
31+
Self::try_from_cached_source(backend, cache).await
32+
}
33+
}
34+
1035
pub struct HttpBackend {
1136
client: Client,
1237
url: Url,

src/backend/mmap.rs renamed to src/backend_mmap.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,30 @@ use async_trait::async_trait;
55
use bytes::{Buf, Bytes};
66
use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt as _, AsyncOptions};
77

8-
use crate::async_reader::AsyncBackend;
8+
use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
9+
use crate::cache::{DirectoryCache, NoCache};
910
use crate::error::{PmtError, PmtResult};
1011

12+
impl AsyncPmTilesReader<MmapBackend, NoCache> {
13+
/// Creates a new `PMTiles` reader from a file path using the async mmap backend.
14+
///
15+
/// Fails if [p] does not exist or is an invalid archive.
16+
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> PmtResult<Self> {
17+
Self::new_with_cached_path(NoCache, path).await
18+
}
19+
}
20+
21+
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
22+
/// Creates a new cached `PMTiles` reader from a file path using the async mmap backend.
23+
///
24+
/// Fails if [p] does not exist or is an invalid archive.
25+
pub async fn new_with_cached_path<P: AsRef<Path>>(cache: C, path: P) -> PmtResult<Self> {
26+
let backend = MmapBackend::try_from(path).await?;
27+
28+
Self::try_from_cached_source(backend, cache).await
29+
}
30+
}
31+
1132
pub struct MmapBackend {
1233
file: AsyncMmapFile,
1334
}

src/backend_s3.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use async_trait::async_trait;
2+
use bytes::Bytes;
3+
use s3::Bucket;
4+
5+
use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
6+
use crate::cache::{DirectoryCache, NoCache};
7+
use crate::error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned};
8+
use crate::PmtResult;
9+
10+
impl AsyncPmTilesReader<S3Backend, NoCache> {
11+
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
12+
///
13+
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
14+
pub async fn new_with_bucket_path(bucket: Bucket, path: String) -> PmtResult<Self> {
15+
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
16+
}
17+
}
18+
19+
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
20+
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
21+
///
22+
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
23+
pub async fn new_with_cached_bucket_path(
24+
cache: C,
25+
bucket: Bucket,
26+
path: String,
27+
) -> PmtResult<Self> {
28+
let backend = S3Backend::from(bucket, path);
29+
30+
Self::try_from_cached_source(backend, cache).await
31+
}
32+
}
33+
34+
pub struct S3Backend {
35+
bucket: Bucket,
36+
path: String,
37+
}
38+
39+
impl S3Backend {
40+
#[must_use]
41+
pub fn from(bucket: Bucket, path: String) -> S3Backend {
42+
Self { bucket, path }
43+
}
44+
}
45+
46+
#[async_trait]
47+
impl AsyncBackend for S3Backend {
48+
async fn read_exact(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
49+
let data = self.read(offset, length).await?;
50+
51+
if data.len() == length {
52+
Ok(data)
53+
} else {
54+
Err(UnexpectedNumberOfBytesReturned(length, data.len()))
55+
}
56+
}
57+
58+
async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
59+
let response = self
60+
.bucket
61+
.get_object_range(
62+
self.path.as_str(),
63+
offset as _,
64+
Some((offset + length - 1) as _),
65+
)
66+
.await?;
67+
68+
let response_bytes = response.bytes();
69+
70+
if response_bytes.len() > length {
71+
Err(ResponseBodyTooLong(response_bytes.len(), length))
72+
} else {
73+
Ok(response_bytes.clone())
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)