From 87c30f132429d44423d1358c6c8ec26469ad1fc1 Mon Sep 17 00:00:00 2001 From: Luke Seelenbinder Date: Fri, 20 Sep 2024 13:35:00 +0300 Subject: [PATCH] Add AWS S3 (#44) * Fix lint. * Bump rust-s3 dependency. * Implement read_exact in trait. * Add aws-sdk-based S3 backend. * Update CI to use Justfile. * Fix lint. * Don't use async in trait. * Bump MSRV. * Restore necessary prefix. * Fix mismatch of MSRV and prefix lint. * Fix changed feature name. * Justfile cleanup * Don't run tests with incompatible features. * Export AWS backend. * Use async fn to avoid incorrect marker types. * Desugar async fn to add trait bounds. * Fix semver. * Fix semver features. * MR Nit fixes. --- .github/workflows/test.yml | 26 ++++------- Cargo.toml | 11 +++-- justfile | 32 ++++++++++---- src/async_reader.rs | 18 +++++++- src/backend_aws_s3.rs | 89 ++++++++++++++++++++++++++++++++++++++ src/backend_http.rs | 13 ------ src/backend_s3.rs | 31 ++++++------- src/directory.rs | 2 +- src/error.rs | 12 ++++- src/lib.rs | 6 +++ 10 files changed, 174 insertions(+), 66 deletions(-) create mode 100644 src/backend_aws_s3.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1981e43..4a56e63 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,6 +16,8 @@ jobs: steps: - name: Checkout sources uses: actions/checkout@v4 + - uses: taiki-e/install-action@v2 + with: { tool: just } - uses: Swatinem/rust-cache@v2 if: github.event_name != 'release' && github.event_name != 'workflow_dispatch' @@ -23,25 +25,19 @@ jobs: rustc --version cargo --version rustup --version - - run: cargo check - - run: rustup component add clippy rustfmt - - run: cargo fmt --all -- --check - - run: cargo clippy --all-targets --all-features -- -D warnings - - run: cargo test --all-targets --all-features - - run: cargo test --features http-async - - run: cargo test --features mmap-async-tokio - - run: cargo test --features tilejson - - run: cargo test --features s3-async-native - - run: cargo test --features s3-async-rustls - - run: cargo test + - run: just test-all - name: Check semver uses: obi1kenobi/cargo-semver-checks-action@v2 + with: + feature-group: only-explicit-features + features: "http-async,mmap-async-tokio,tilejson,s3-async-rustls,aws-s3-async" msrv: name: Test MSRV runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + - uses: extractions/setup-just@v2 - uses: Swatinem/rust-cache@v2 if: github.event_name != 'release' && github.event_name != 'workflow_dispatch' - name: Read crate metadata @@ -51,10 +47,4 @@ jobs: uses: dtolnay/rust-toolchain@stable with: toolchain: ${{ steps.metadata.outputs.rust-version }} - - run: cargo test --all-targets --all-features - - run: cargo test --features http-async - - run: cargo test --features mmap-async-tokio - - run: cargo test --features tilejson - - run: cargo test --features s3-async-native - - run: cargo test --features s3-async-rustls - - run: cargo test + - run: just test diff --git a/Cargo.toml b/Cargo.toml index e4984a7..babc2d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "pmtiles" -version = "0.10.0" +version = "0.11.0" edition = "2021" authors = ["Luke Seelenbinder "] license = "MIT OR Apache-2.0" description = "Implementation of the PMTiles v3 spec with multiple sync and async backends." repository = "https://github.com/stadiamaps/pmtiles-rs" keywords = ["pmtiles", "gis", "geo"] -rust-version = "1.77.0" +rust-version = "1.81.0" categories = ["science::geo"] [features] @@ -16,6 +16,7 @@ http-async = ["__async", "dep:reqwest"] mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"] s3-async-native = ["__async-s3", "__async-s3-nativetls"] s3-async-rustls = ["__async-s3", "__async-s3-rustls"] +aws-s3-async = ["__async-aws-s3"] tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"] # Forward some of the common features to reqwest dependency @@ -28,17 +29,19 @@ reqwest-rustls-tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"] # Internal features, do not use __async = ["dep:tokio", "async-compression/tokio"] __async-s3 = ["__async", "dep:rust-s3"] -__async-s3-nativetls = ["rust-s3?/tokio-native-tls"] +__async-s3-nativetls = ["rust-s3?/use-tokio-native-tls"] __async-s3-rustls = ["rust-s3?/tokio-rustls-tls"] +__async-aws-s3 = ["__async", "dep:aws-sdk-s3"] [dependencies] # TODO: determine how we want to handle compression in async & sync environments +aws-sdk-s3 = { version = "1.49.0", optional = true } async-compression = { version = "0.4", features = ["gzip"] } bytes = "1" fmmap = { version = "0.3", default-features = false, optional = true } hilbert_2d = "1" reqwest = { version = "0.12.4", default-features = false, optional = true } -rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] } +rust-s3 = { version = "0.35.1", optional = true, default-features = false, features = ["fail-on-err"] } serde = { version = "1", optional = true } serde_json = { version = "1", optional = true } thiserror = "1" diff --git a/justfile b/justfile index aace664..1038eb0 100644 --- a/justfile +++ b/justfile @@ -3,32 +3,46 @@ @_default: just --list --unsorted -# Run all tests -test: - # These are the same tests that are run on CI. Eventually CI should just call into justfile +# Run cargo check +check: cargo check + +_add_tools: rustup component add clippy rustfmt - cargo fmt --all -- --check - cargo clippy --all-targets --all-features -- -D warnings - cargo test --all-targets --all-features + +# Run all tests +test: cargo test --features http-async cargo test --features mmap-async-tokio cargo test --features tilejson cargo test --features s3-async-native cargo test --features s3-async-rustls + cargo test --features aws-s3-async cargo test RUSTDOCFLAGS="-D warnings" cargo doc --no-deps +# Run all tests and checks +test-all: check fmt clippy + # Run cargo fmt and cargo clippy lint: fmt clippy # Run cargo fmt -fmt: +fmt: _add_tools + cargo fmt --all -- --check + +# Run cargo fmt using Rust nightly +fmt-nightly: cargo +nightly fmt -- --config imports_granularity=Module,group_imports=StdExternalCrate # Run cargo clippy -clippy: - cargo clippy --workspace --all-targets --all-features --bins --tests --lib --benches -- -D warnings +clippy: _add_tools + cargo clippy --workspace --all-targets --features http-async + cargo clippy --workspace --all-targets --features mmap-async-tokio + cargo clippy --workspace --all-targets --features tilejson + cargo clippy --workspace --all-targets --features s3-async-native + cargo clippy --workspace --all-targets --features s3-async-rustls + cargo clippy --workspace --all-targets --features aws-s3-async # Build and open code documentation docs: diff --git a/src/async_reader.rs b/src/async_reader.rs index 08170eb..12c48c0 100644 --- a/src/async_reader.rs +++ b/src/async_reader.rs @@ -219,7 +219,23 @@ pub trait AsyncBackend { &self, offset: usize, length: usize, - ) -> impl Future> + Send; + ) -> impl Future> + Send + where + Self: Sync, + { + async move { + let data = self.read(offset, length).await?; + + if data.len() == length { + Ok(data) + } else { + Err(PmtError::UnexpectedNumberOfBytesReturned( + length, + data.len(), + )) + } + } + } /// Reads up to `length` bytes starting at `offset`. fn read(&self, offset: usize, length: usize) -> impl Future> + Send; diff --git a/src/backend_aws_s3.rs b/src/backend_aws_s3.rs new file mode 100644 index 0000000..306bec3 --- /dev/null +++ b/src/backend_aws_s3.rs @@ -0,0 +1,89 @@ +use crate::{ + async_reader::{AsyncBackend, AsyncPmTilesReader}, + cache::{DirectoryCache, NoCache}, + PmtError, PmtResult, +}; +use aws_sdk_s3::Client; +use bytes::Bytes; + +impl AsyncPmTilesReader { + /// Creates a new `PMTiles` reader from a client, bucket and key to the + /// archive using the `aws-sdk-s3` backend. + /// + /// Fails if the [bucket] or [key] does not exist or is an invalid + /// archive. + /// (Note: S3 requests are made to validate it.) + pub async fn new_with_client_bucket_and_path( + client: Client, + bucket: String, + key: String, + ) -> PmtResult { + Self::new_with_cached_client_bucket_and_path(NoCache, client, bucket, key).await + } +} + +impl AsyncPmTilesReader { + /// Creates a new `PMTiles` reader from a client, bucket and key to the + /// archive using the `aws-sdk-s3` backend. Caches using the designated + /// [cache]. + /// + /// Fails if the [bucket] or [key] does not exist or is an invalid + /// archive. + /// (Note: S3 requests are made to validate it.) + pub async fn new_with_cached_client_bucket_and_path( + cache: C, + client: Client, + bucket: String, + key: String, + ) -> PmtResult { + let backend = AwsS3Backend::from(client, bucket, key); + + Self::try_from_cached_source(backend, cache).await + } +} + +pub struct AwsS3Backend { + client: Client, + bucket: String, + key: String, +} + +impl AwsS3Backend { + #[must_use] + pub fn from(client: Client, bucket: String, key: String) -> Self { + Self { + client, + bucket, + key, + } + } +} + +impl AsyncBackend for AwsS3Backend { + async fn read(&self, offset: usize, length: usize) -> PmtResult { + let range_end = offset + length - 1; + let range = format!("bytes={offset}-{range_end}"); + + let obj = self + .client + .get_object() + .bucket(self.bucket.clone()) + .key(self.key.clone()) + .range(range) + .send() + .await?; + + let response_bytes = obj + .body + .collect() + .await + .map_err(|e| PmtError::Reading(e.into()))? + .into_bytes(); + + if response_bytes.len() > length { + Err(PmtError::ResponseBodyTooLong(response_bytes.len(), length)) + } else { + Ok(response_bytes) + } + } +} diff --git a/src/backend_http.rs b/src/backend_http.rs index 14180de..7632042 100644 --- a/src/backend_http.rs +++ b/src/backend_http.rs @@ -46,19 +46,6 @@ impl HttpBackend { } impl AsyncBackend for HttpBackend { - async fn read_exact(&self, offset: usize, length: usize) -> PmtResult { - let data = self.read(offset, length).await?; - - if data.len() == length { - Ok(data) - } else { - Err(PmtError::UnexpectedNumberOfBytesReturned( - length, - data.len(), - )) - } - } - async fn read(&self, offset: usize, length: usize) -> PmtResult { let end = offset + length - 1; let range = format!("bytes={offset}-{end}"); diff --git a/src/backend_s3.rs b/src/backend_s3.rs index 89902cd..e319ffd 100644 --- a/src/backend_s3.rs +++ b/src/backend_s3.rs @@ -1,24 +1,29 @@ use bytes::Bytes; use s3::Bucket; -use crate::async_reader::{AsyncBackend, AsyncPmTilesReader}; -use crate::cache::{DirectoryCache, NoCache}; -use crate::error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned}; -use crate::PmtResult; +use crate::{ + async_reader::{AsyncBackend, AsyncPmTilesReader}, + cache::{DirectoryCache, NoCache}, + error::PmtError::ResponseBodyTooLong, + PmtResult, +}; impl AsyncPmTilesReader { - /// Creates a new `PMTiles` reader from a URL using the Reqwest backend. + /// Creates a new `PMTiles` reader from a bucket and path to the + /// archive using the `rust-s3` backend. /// - /// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.) + /// Fails if [bucket] or [path] does not exist or is an invalid archive. (Note: S3 requests are made to validate it.) pub async fn new_with_bucket_path(bucket: Bucket, path: String) -> PmtResult { Self::new_with_cached_bucket_path(NoCache, bucket, path).await } } impl AsyncPmTilesReader { - /// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend. + /// Creates a new `PMTiles` reader from a bucket and path to the + /// archive using the `rust-s3` backend with a given [cache] backend. /// - /// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.) + /// Fails if [bucket] or [path] does not exist or is an invalid archive. (Note: S3 requests are made to validate it.) + /// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend. pub async fn new_with_cached_bucket_path( cache: C, bucket: Bucket, @@ -43,16 +48,6 @@ impl S3Backend { } impl AsyncBackend for S3Backend { - async fn read_exact(&self, offset: usize, length: usize) -> PmtResult { - let data = self.read(offset, length).await?; - - if data.len() == length { - Ok(data) - } else { - Err(UnexpectedNumberOfBytesReturned(length, data.len())) - } - } - async fn read(&self, offset: usize, length: usize) -> PmtResult { let response = self .bucket diff --git a/src/directory.rs b/src/directory.rs index c416d5e..a839a91 100644 --- a/src/directory.rs +++ b/src/directory.rs @@ -41,7 +41,7 @@ impl Directory { /// Get an estimated byte size of the directory object. Use this for cache eviction. #[must_use] pub fn get_approx_byte_size(&self) -> usize { - self.entries.capacity() * std::mem::size_of::() + self.entries.capacity() * size_of::() } } diff --git a/src/error.rs b/src/error.rs index 43cec6e..062b916 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,13 +33,16 @@ pub enum PmtError { #[cfg(feature = "mmap-async-tokio")] #[error("Unable to open mmap file")] UnableToOpenMmapFile, - #[cfg(any(feature = "http-async", feature = "__async-s3"))] #[error("Unexpected number of bytes returned [expected: {0}, received: {1}].")] UnexpectedNumberOfBytesReturned(usize, usize), #[cfg(feature = "http-async")] #[error("Range requests unsupported")] RangeRequestsUnsupported, - #[cfg(any(feature = "http-async", feature = "__async-s3"))] + #[cfg(any( + feature = "http-async", + feature = "__async-s3", + feature = "__async-aws-s3" + ))] #[error("HTTP response body is too long, Response {0}B > requested {1}B")] ResponseBodyTooLong(usize, usize), #[cfg(feature = "http-async")] @@ -51,4 +54,9 @@ pub enum PmtError { #[cfg(feature = "__async-s3")] #[error(transparent)] S3(#[from] s3::error::S3Error), + #[cfg(feature = "__async-aws-s3")] + #[error(transparent)] + AwsS3Request( + #[from] aws_sdk_s3::error::SdkError, + ), } diff --git a/src/lib.rs b/src/lib.rs index a8fe6f3..855245d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ #[cfg(feature = "__async")] pub mod async_reader; +#[cfg(feature = "__async-aws-s3")] +mod backend_aws_s3; #[cfg(feature = "http-async")] mod backend_http; #[cfg(feature = "mmap-async-tokio")] @@ -16,6 +18,8 @@ mod header; #[cfg(feature = "__async")] mod tile; +#[cfg(feature = "aws-s3-async")] +pub use backend_aws_s3::AwsS3Backend; #[cfg(feature = "http-async")] pub use backend_http::HttpBackend; #[cfg(feature = "mmap-async-tokio")] @@ -27,6 +31,8 @@ pub use error::{PmtError, PmtResult}; pub use header::{Compression, Header, TileType}; // // Re-export crates exposed in our API to simplify dependency management +#[cfg(feature = "__async-aws-s3")] +pub use aws_sdk_s3; #[cfg(feature = "http-async")] pub use reqwest; #[cfg(feature = "__async-s3")]