From a251b52a1567aa82f3338456fcb35cfc00679c79 Mon Sep 17 00:00:00 2001 From: Yuri Astrakhan Date: Tue, 7 Nov 2023 23:09:19 -0500 Subject: [PATCH] Refactor to cleanup and prepare for directory caching * there is no point in returning type and compression with every `get_tile` -- PMTiles keeps this data in the header, so it is constant, and if the user needs it, they can get it directly. Returns `bytes` now. * add `Entry::is_leaf` helper * add `Header::get_bounds` and `Header::get_center` (tilejson structs) * no need for `AsyncPmTilesReader` type - it has to be specified in the impl anyway * no need for the `backend::read_initial_bytes` - it is only used once, has default implementation anyway. Inlined. * inlined `read_directory_with_backend` - used once and tiny * split up the `find_tile_entry` into two functions - I will need this later to add caching -- the root entry is permanently cached as part of the main struct, but the other ones are not, so needs a different code path. * added `cargo test` for default features --- .github/workflows/test.yml | 1 + Cargo.toml | 4 +- justfile | 3 +- src/async_reader.rs | 151 ++++++++++++++----------------------- src/directory.rs | 9 ++- src/header.rs | 30 +++++--- src/http.rs | 2 +- src/tile.rs | 10 --- 8 files changed, 91 insertions(+), 119 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ea34a84..59dbcc6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,3 +29,4 @@ jobs: - run: cargo test --features http-async - run: cargo test --features mmap-async-tokio - run: cargo test --features tilejson + - run: cargo test diff --git a/Cargo.toml b/Cargo.toml index 8161f17..efcbda9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pmtiles" -version = "0.3.1" +version = "0.4.0" edition = "2021" authors = ["Luke Seelenbinder "] license = "MIT OR Apache-2.0" @@ -36,10 +36,10 @@ tokio = { version = "1", default-features = false, features = ["io-util"], optio varint-rs = "2" [dev-dependencies] +flate2 = "1" fmmap = { version = "0.3", features = ["tokio-async"] } reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots"] } tokio = { version = "1", features = ["test-util", "macros", "rt"] } -flate2 = "1" [package.metadata.docs.rs] all-features = true diff --git a/justfile b/justfile index c4cf879..ec25373 100644 --- a/justfile +++ b/justfile @@ -14,6 +14,7 @@ test: cargo test --features http-async cargo test --features mmap-async-tokio cargo test --features tilejson + cargo test RUSTDOCFLAGS="-D warnings" cargo doc --no-deps # Run cargo fmt and cargo clippy @@ -25,7 +26,7 @@ fmt: # Run cargo clippy clippy: - cargo clippy --workspace --all-targets --bins --tests --lib --benches -- -D warnings + cargo clippy --workspace --all-targets --all-features --bins --tests --lib --benches -- -D warnings # Build and open code documentation docs: diff --git a/src/async_reader.rs b/src/async_reader.rs index 9defdea..41794d8 100644 --- a/src/async_reader.rs +++ b/src/async_reader.rs @@ -16,12 +16,12 @@ use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES}; use crate::http::HttpBackend; #[cfg(feature = "mmap-async-tokio")] use crate::mmap::MmapBackend; -use crate::tile::{tile_id, Tile}; +use crate::tile::tile_id; use crate::{Compression, Header}; -pub struct AsyncPmTilesReader { - pub header: Header, +pub struct AsyncPmTilesReader { backend: B, + header: Header, root_directory: Directory, } @@ -30,11 +30,13 @@ impl AsyncPmTilesReader { /// /// Note: Prefer using new_with_* methods. pub async fn try_from_source(backend: B) -> Result { - let mut initial_bytes = backend.read_initial_bytes().await?; - - let header_bytes = initial_bytes.split_to(HEADER_SIZE); + // Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory. + let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?; + if initial_bytes.len() < HEADER_SIZE { + return Err(Error::InvalidHeader); + } - let header = Header::try_from_bytes(header_bytes)?; + let header = Header::try_from_bytes(initial_bytes.split_to(HEADER_SIZE))?; let directory_bytes = initial_bytes .split_off((header.root_offset as usize) - HEADER_SIZE) @@ -44,31 +46,27 @@ impl AsyncPmTilesReader { Self::read_compressed_directory(header.internal_compression, directory_bytes).await?; Ok(Self { - header, backend, + header, root_directory, }) } - /// Fetches a [Tile] from the archive. - pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option { + /// Fetches tile bytes from the archive. + pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option { let tile_id = tile_id(z, x, y); - let entry = self.find_tile_entry(tile_id, None, 0).await?; - - let data = self - .backend - .read_exact( - (self.header.data_offset + entry.offset) as _, - entry.length as _, - ) - .await - .ok()?; + let entry = self.find_tile_entry(tile_id).await?; - Some(Tile { - data, - tile_type: self.header.tile_type, - tile_compression: self.header.tile_compression, - }) + let offset = (self.header.data_offset + entry.offset) as _; + let length = entry.length as _; + let data = self.backend.read_exact(offset, length).await.ok()?; + + Some(data) + } + + /// Access header information. + pub fn header(&self) -> &Header { + &self.header } /// Gets metadata from the archive. @@ -76,13 +74,9 @@ impl AsyncPmTilesReader { /// Note: by spec, this should be valid JSON. This method currently returns a [String]. /// This may change in the future. pub async fn get_metadata(&self) -> Result { - let metadata = self - .backend - .read_exact( - self.header.metadata_offset as _, - self.header.metadata_length as _, - ) - .await?; + let offset = self.header.metadata_offset as _; + let length = self.header.metadata_length as _; + let metadata = self.backend.read_exact(offset, length).await?; let decompressed_metadata = Self::decompress(self.header.internal_compression, metadata).await?; @@ -132,49 +126,42 @@ impl AsyncPmTilesReader { Ok(tj) } - #[async_recursion] - async fn find_tile_entry( - &self, - tile_id: u64, - next_dir: Option, - depth: u8, - ) -> Option { - // Max recursion... - if depth >= 4 { - return None; + /// Recursively locates a tile in the archive. + async fn find_tile_entry(&self, tile_id: u64) -> Option { + let entry = self.root_directory.find_tile_id(tile_id); + if let Some(entry) = entry { + if entry.is_leaf() { + return self.find_entry_rec(tile_id, entry, 0).await; + } } + entry.cloned() + } - let next_dir = next_dir.as_ref().unwrap_or(&self.root_directory); - - match next_dir.find_tile_id(tile_id) { - None => None, - Some(needle) => { - if needle.run_length == 0 { - // Leaf directory - let next_dir = self - .read_directory( - (self.header.leaf_offset + needle.offset) as _, - needle.length as _, - ) - .await - .ok()?; - self.find_tile_entry(tile_id, Some(next_dir), depth + 1) - .await + #[async_recursion] + async fn find_entry_rec(&self, tile_id: u64, entry: &Entry, depth: u8) -> Option { + // the recursion is done as two functions because it is a bit cleaner, + // and it allows directory to be cached later without cloning it first. + let offset = (self.header.leaf_offset + entry.offset) as _; + let length = entry.length as _; + let dir = self.read_directory(offset, length).await.ok()?; + let entry = dir.find_tile_id(tile_id); + + if let Some(entry) = entry { + if entry.is_leaf() { + return if depth <= 4 { + self.find_entry_rec(tile_id, entry, depth + 1).await } else { - Some(needle.clone()) - } + None + }; } } + + entry.cloned() } async fn read_directory(&self, offset: usize, length: usize) -> Result { - Self::read_directory_with_backend( - &self.backend, - self.header.internal_compression, - offset, - length, - ) - .await + let data = self.backend.read_exact(offset, length).await?; + Self::read_compressed_directory(self.header.internal_compression, data).await } async fn read_compressed_directory( @@ -182,21 +169,9 @@ impl AsyncPmTilesReader { bytes: Bytes, ) -> Result { let decompressed_bytes = Self::decompress(compression, bytes).await?; - Directory::try_from(decompressed_bytes) } - async fn read_directory_with_backend( - backend: &B, - compression: Compression, - offset: usize, - length: usize, - ) -> Result { - let directory_bytes = backend.read_exact(offset, length).await?; - - Self::read_compressed_directory(compression, directory_bytes).await - } - async fn decompress(compression: Compression, bytes: Bytes) -> Result { let mut decompressed_bytes = Vec::with_capacity(bytes.len() * 2); match compression { @@ -229,8 +204,8 @@ impl AsyncPmTilesReader { /// Creates a new PMTiles reader from a file path using the async mmap backend. /// /// Fails if [p] does not exist or is an invalid archive. - pub async fn new_with_path>(p: P) -> Result { - let backend = MmapBackend::try_from(p).await?; + pub async fn new_with_path>(path: P) -> Result { + let backend = MmapBackend::try_from(path).await?; Self::try_from_source(backend).await } @@ -243,16 +218,6 @@ pub trait AsyncBackend { /// Reads up to `length` bytes starting at `offset`. async fn read(&self, offset: usize, length: usize) -> Result; - - /// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory. - async fn read_initial_bytes(&self) -> Result { - let bytes = self.read(0, MAX_INITIAL_BYTES).await?; - if bytes.len() < HEADER_SIZE { - return Err(Error::InvalidHeader); - } - - Ok(bytes) - } } #[cfg(test)] @@ -274,11 +239,11 @@ mod tests { let tile = tiles.get_tile(z, x, y).await.unwrap(); assert_eq!( - tile.data.len(), + tile.len(), fixture_bytes.len(), "Expected tile length to match." ); - assert_eq!(tile.data, fixture_bytes, "Expected tile to match fixture."); + assert_eq!(tile, fixture_bytes, "Expected tile to match fixture."); } #[tokio::test] diff --git a/src/directory.rs b/src/directory.rs index 2292b8e..ccceab0 100644 --- a/src/directory.rs +++ b/src/directory.rs @@ -25,7 +25,7 @@ impl Directory { // https://github.com/protomaps/PMTiles/blob/9c7f298fb42290354b8ed0a9b2f50e5c0d270c40/js/index.ts#L210 if next_id > 0 { let previous_tile = self.entries.get(next_id - 1)?; - if previous_tile.run_length == 0 + if previous_tile.is_leaf() || tile_id - previous_tile.tile_id < previous_tile.run_length as u64 { return Some(previous_tile); @@ -88,6 +88,13 @@ pub(crate) struct Entry { pub(crate) run_length: u32, } +#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))] +impl Entry { + pub fn is_leaf(&self) -> bool { + self.run_length == 0 + } +} + #[cfg(test)] mod tests { use std::io::{BufReader, Read, Write}; diff --git a/src/header.rs b/src/header.rs index 0f6487f..e0c3457 100644 --- a/src/header.rs +++ b/src/header.rs @@ -80,19 +80,27 @@ impl Header { tiles: sources, minzoom: self.min_zoom, maxzoom: self.max_zoom, - bounds: tilejson::Bounds::new( - self.min_longitude as f64, - self.min_latitude as f64, - self.max_longitude as f64, - self.max_latitude as f64, - ), - center: tilejson::Center::new( - self.center_longitude as f64, - self.center_latitude as f64, - self.center_zoom, - ), + bounds: self.get_bounds(), + center: self.get_center(), } } + + pub fn get_bounds(&self) -> tilejson::Bounds { + tilejson::Bounds::new( + self.min_longitude as f64, + self.min_latitude as f64, + self.max_longitude as f64, + self.max_latitude as f64, + ) + } + + pub fn get_center(&self) -> tilejson::Center { + tilejson::Center::new( + self.center_longitude as f64, + self.center_latitude as f64, + self.center_zoom, + ) + } } #[derive(Debug, Eq, PartialEq, Copy, Clone)] diff --git a/src/http.rs b/src/http.rs index 2d1d181..97bb507 100644 --- a/src/http.rs +++ b/src/http.rs @@ -67,6 +67,6 @@ mod tests { let client = reqwest::Client::builder().use_rustls_tls().build().unwrap(); let backend = HttpBackend::try_from(client, TEST_URL).unwrap(); - let _tiles = AsyncPmTilesReader::try_from_source(backend).await.unwrap(); + AsyncPmTilesReader::try_from_source(backend).await.unwrap(); } } diff --git a/src/tile.rs b/src/tile.rs index bb1f7d1..0d7005c 100644 --- a/src/tile.rs +++ b/src/tile.rs @@ -1,7 +1,3 @@ -use bytes::Bytes; - -use crate::{Compression, TileType}; - #[cfg(any(feature = "http-async", feature = "mmap-async-tokio", test))] pub(crate) fn tile_id(z: u8, x: u64, y: u64) -> u64 { if z == 0 { @@ -15,12 +11,6 @@ pub(crate) fn tile_id(z: u8, x: u64, y: u64) -> u64 { base_id + tile_id } -pub struct Tile { - pub data: Bytes, - pub tile_type: TileType, - pub tile_compression: Compression, -} - #[cfg(test)] mod test { use super::tile_id;