Skip to content

Commit

Permalink
Add AWS S3 (#44)
Browse files Browse the repository at this point in the history
* Fix lint.

* Bump rust-s3 dependency.

* Implement read_exact in trait.

* Add aws-sdk-based S3 backend.

* Update CI to use Justfile.

* Fix lint.

* Don't use async in trait.

* Bump MSRV.

* Restore necessary prefix.

* Fix mismatch of MSRV and prefix lint.

* Fix changed feature name.

* Justfile cleanup

* Don't run tests with incompatible features.

* Export AWS backend.

* Use async fn to avoid incorrect marker types.

* Desugar async fn to add trait bounds.

* Fix semver.

* Fix semver features.

* MR Nit fixes.
  • Loading branch information
lseelenbinder authored Sep 20, 2024
1 parent 3012fc4 commit 87c30f1
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 66 deletions.
26 changes: 8 additions & 18 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,28 @@ jobs:
steps:
- name: Checkout sources
uses: actions/checkout@v4
- uses: taiki-e/install-action@v2
with: { tool: just }
- uses: Swatinem/rust-cache@v2
if: github.event_name != 'release' && github.event_name != 'workflow_dispatch'

- run: |
rustc --version
cargo --version
rustup --version
- run: cargo check
- run: rustup component add clippy rustfmt
- run: cargo fmt --all -- --check
- run: cargo clippy --all-targets --all-features -- -D warnings
- run: cargo test --all-targets --all-features
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test --features s3-async-native
- run: cargo test --features s3-async-rustls
- run: cargo test
- run: just test-all
- name: Check semver
uses: obi1kenobi/cargo-semver-checks-action@v2
with:
feature-group: only-explicit-features
features: "http-async,mmap-async-tokio,tilejson,s3-async-rustls,aws-s3-async"

msrv:
name: Test MSRV
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: extractions/setup-just@v2
- uses: Swatinem/rust-cache@v2
if: github.event_name != 'release' && github.event_name != 'workflow_dispatch'
- name: Read crate metadata
Expand All @@ -51,10 +47,4 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ steps.metadata.outputs.rust-version }}
- run: cargo test --all-targets --all-features
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test --features s3-async-native
- run: cargo test --features s3-async-rustls
- run: cargo test
- run: just test
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "pmtiles"
version = "0.10.0"
version = "0.11.0"
edition = "2021"
authors = ["Luke Seelenbinder <[email protected]>"]
license = "MIT OR Apache-2.0"
description = "Implementation of the PMTiles v3 spec with multiple sync and async backends."
repository = "https://github.com/stadiamaps/pmtiles-rs"
keywords = ["pmtiles", "gis", "geo"]
rust-version = "1.77.0"
rust-version = "1.81.0"
categories = ["science::geo"]

[features]
Expand All @@ -16,6 +16,7 @@ http-async = ["__async", "dep:reqwest"]
mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"]
s3-async-native = ["__async-s3", "__async-s3-nativetls"]
s3-async-rustls = ["__async-s3", "__async-s3-rustls"]
aws-s3-async = ["__async-aws-s3"]
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]

# Forward some of the common features to reqwest dependency
Expand All @@ -28,17 +29,19 @@ reqwest-rustls-tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
# Internal features, do not use
__async = ["dep:tokio", "async-compression/tokio"]
__async-s3 = ["__async", "dep:rust-s3"]
__async-s3-nativetls = ["rust-s3?/tokio-native-tls"]
__async-s3-nativetls = ["rust-s3?/use-tokio-native-tls"]
__async-s3-rustls = ["rust-s3?/tokio-rustls-tls"]
__async-aws-s3 = ["__async", "dep:aws-sdk-s3"]

[dependencies]
# TODO: determine how we want to handle compression in async & sync environments
aws-sdk-s3 = { version = "1.49.0", optional = true }
async-compression = { version = "0.4", features = ["gzip"] }
bytes = "1"
fmmap = { version = "0.3", default-features = false, optional = true }
hilbert_2d = "1"
reqwest = { version = "0.12.4", default-features = false, optional = true }
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }
rust-s3 = { version = "0.35.1", optional = true, default-features = false, features = ["fail-on-err"] }
serde = { version = "1", optional = true }
serde_json = { version = "1", optional = true }
thiserror = "1"
Expand Down
32 changes: 23 additions & 9 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,46 @@
@_default:
just --list --unsorted

# Run all tests
test:
# These are the same tests that are run on CI. Eventually CI should just call into justfile
# Run cargo check
check:
cargo check

_add_tools:
rustup component add clippy rustfmt
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings
cargo test --all-targets --all-features

# Run all tests
test:
cargo test --features http-async
cargo test --features mmap-async-tokio
cargo test --features tilejson
cargo test --features s3-async-native
cargo test --features s3-async-rustls
cargo test --features aws-s3-async
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

# Run all tests and checks
test-all: check fmt clippy

# Run cargo fmt and cargo clippy
lint: fmt clippy

# Run cargo fmt
fmt:
fmt: _add_tools
cargo fmt --all -- --check

# Run cargo fmt using Rust nightly
fmt-nightly:
cargo +nightly fmt -- --config imports_granularity=Module,group_imports=StdExternalCrate

# Run cargo clippy
clippy:
cargo clippy --workspace --all-targets --all-features --bins --tests --lib --benches -- -D warnings
clippy: _add_tools
cargo clippy --workspace --all-targets --features http-async
cargo clippy --workspace --all-targets --features mmap-async-tokio
cargo clippy --workspace --all-targets --features tilejson
cargo clippy --workspace --all-targets --features s3-async-native
cargo clippy --workspace --all-targets --features s3-async-rustls
cargo clippy --workspace --all-targets --features aws-s3-async

# Build and open code documentation
docs:
Expand Down
18 changes: 17 additions & 1 deletion src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,23 @@ pub trait AsyncBackend {
&self,
offset: usize,
length: usize,
) -> impl Future<Output = PmtResult<Bytes>> + Send;
) -> impl Future<Output = PmtResult<Bytes>> + Send
where
Self: Sync,
{
async move {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(PmtError::UnexpectedNumberOfBytesReturned(
length,
data.len(),
))
}
}
}

/// Reads up to `length` bytes starting at `offset`.
fn read(&self, offset: usize, length: usize) -> impl Future<Output = PmtResult<Bytes>> + Send;
Expand Down
89 changes: 89 additions & 0 deletions src/backend_aws_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::{
async_reader::{AsyncBackend, AsyncPmTilesReader},
cache::{DirectoryCache, NoCache},
PmtError, PmtResult,
};
use aws_sdk_s3::Client;
use bytes::Bytes;

impl AsyncPmTilesReader<AwsS3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a client, bucket and key to the
/// archive using the `aws-sdk-s3` backend.
///
/// Fails if the [bucket] or [key] does not exist or is an invalid
/// archive.
/// (Note: S3 requests are made to validate it.)
pub async fn new_with_client_bucket_and_path(
client: Client,
bucket: String,
key: String,
) -> PmtResult<Self> {
Self::new_with_cached_client_bucket_and_path(NoCache, client, bucket, key).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<AwsS3Backend, C> {
/// Creates a new `PMTiles` reader from a client, bucket and key to the
/// archive using the `aws-sdk-s3` backend. Caches using the designated
/// [cache].
///
/// Fails if the [bucket] or [key] does not exist or is an invalid
/// archive.
/// (Note: S3 requests are made to validate it.)
pub async fn new_with_cached_client_bucket_and_path(
cache: C,
client: Client,
bucket: String,
key: String,
) -> PmtResult<Self> {
let backend = AwsS3Backend::from(client, bucket, key);

Self::try_from_cached_source(backend, cache).await
}
}

pub struct AwsS3Backend {
client: Client,
bucket: String,
key: String,
}

impl AwsS3Backend {
#[must_use]
pub fn from(client: Client, bucket: String, key: String) -> Self {
Self {
client,
bucket,
key,
}
}
}

impl AsyncBackend for AwsS3Backend {
async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let range_end = offset + length - 1;
let range = format!("bytes={offset}-{range_end}");

let obj = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(self.key.clone())
.range(range)
.send()
.await?;

let response_bytes = obj
.body
.collect()
.await
.map_err(|e| PmtError::Reading(e.into()))?
.into_bytes();

if response_bytes.len() > length {
Err(PmtError::ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes)
}
}
}
13 changes: 0 additions & 13 deletions src/backend_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ impl HttpBackend {
}

impl AsyncBackend for HttpBackend {
async fn read_exact(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(PmtError::UnexpectedNumberOfBytesReturned(
length,
data.len(),
))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let end = offset + length - 1;
let range = format!("bytes={offset}-{end}");
Expand Down
31 changes: 13 additions & 18 deletions src/backend_s3.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
use bytes::Bytes;
use s3::Bucket;

use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
use crate::cache::{DirectoryCache, NoCache};
use crate::error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned};
use crate::PmtResult;
use crate::{
async_reader::{AsyncBackend, AsyncPmTilesReader},
cache::{DirectoryCache, NoCache},
error::PmtError::ResponseBodyTooLong,
PmtResult,
};

impl AsyncPmTilesReader<S3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
/// Creates a new `PMTiles` reader from a bucket and path to the
/// archive using the `rust-s3` backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
/// Fails if [bucket] or [path] does not exist or is an invalid archive. (Note: S3 requests are made to validate it.)
pub async fn new_with_bucket_path(bucket: Bucket, path: String) -> PmtResult<Self> {
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
/// Creates a new `PMTiles` reader from a bucket and path to the
/// archive using the `rust-s3` backend with a given [cache] backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
/// Fails if [bucket] or [path] does not exist or is an invalid archive. (Note: S3 requests are made to validate it.)
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
pub async fn new_with_cached_bucket_path(
cache: C,
bucket: Bucket,
Expand All @@ -43,16 +48,6 @@ impl S3Backend {
}

impl AsyncBackend for S3Backend {
async fn read_exact(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(UnexpectedNumberOfBytesReturned(length, data.len()))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let response = self
.bucket
Expand Down
2 changes: 1 addition & 1 deletion src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Directory {
/// Get an estimated byte size of the directory object. Use this for cache eviction.
#[must_use]
pub fn get_approx_byte_size(&self) -> usize {
self.entries.capacity() * std::mem::size_of::<DirEntry>()
self.entries.capacity() * size_of::<DirEntry>()
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ pub enum PmtError {
#[cfg(feature = "mmap-async-tokio")]
#[error("Unable to open mmap file")]
UnableToOpenMmapFile,
#[cfg(any(feature = "http-async", feature = "__async-s3"))]
#[error("Unexpected number of bytes returned [expected: {0}, received: {1}].")]
UnexpectedNumberOfBytesReturned(usize, usize),
#[cfg(feature = "http-async")]
#[error("Range requests unsupported")]
RangeRequestsUnsupported,
#[cfg(any(feature = "http-async", feature = "__async-s3"))]
#[cfg(any(
feature = "http-async",
feature = "__async-s3",
feature = "__async-aws-s3"
))]
#[error("HTTP response body is too long, Response {0}B > requested {1}B")]
ResponseBodyTooLong(usize, usize),
#[cfg(feature = "http-async")]
Expand All @@ -51,4 +54,9 @@ pub enum PmtError {
#[cfg(feature = "__async-s3")]
#[error(transparent)]
S3(#[from] s3::error::S3Error),
#[cfg(feature = "__async-aws-s3")]
#[error(transparent)]
AwsS3Request(
#[from] aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
),
}
Loading

0 comments on commit 87c30f1

Please sign in to comment.