Skip to content

Commit

Permalink
Lots of lints and clippy things
Browse files Browse the repository at this point in the history
* use `[lints]` Cargo.toml sections
* added a few fixme/todos to the code - we don't need to solve them now, but they should remind us to look at them at some point - might be a source of bugs there.
  • Loading branch information
nyurik committed Nov 20, 2023
1 parent ef1aee2 commit 180e7fb
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 44 deletions.
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ tokio = { version = "1", features = ["test-util", "macros", "rt"] }

[package.metadata.docs.rs]
all-features = true

[lints.rust]
unsafe_code = "forbid"
unused_qualifications = "warn"

[lints.clippy]
pedantic = { level = "warn", priority = -1 }
missing_errors_doc = "allow"
module_name_repetitions = "allow"
similar_names = "allow"
88 changes: 70 additions & 18 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// FIXME: This seems like a bug - there are lots of u64 to usize conversions in this file,
// so any file larger than 4GB, or an untrusted file with bad data may crash.
#![allow(clippy::cast_possible_truncation)]

#[cfg(feature = "mmap-async-tokio")]
use std::path::Path;

Expand All @@ -9,7 +13,10 @@ use reqwest::{Client, IntoUrl};
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use tokio::io::AsyncReadExt;

use crate::directory::{Directory, Entry};
use crate::cache::DirCacheResult;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use crate::cache::{DirectoryCache, NoCache};
use crate::directory::{DirEntry, Directory};
use crate::error::PmtError;
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
#[cfg(feature = "http-async")]
Expand All @@ -19,17 +26,27 @@ use crate::mmap::MmapBackend;
use crate::tile::tile_id;
use crate::{Compression, Header};

pub struct AsyncPmTilesReader<B> {
pub struct AsyncPmTilesReader<B, C = NoCache> {
backend: B,
cache: C,
header: Header,
root_directory: Directory,
}

impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
/// Creates a new reader from a specified source and validates the provided PMTiles archive is valid.
impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B, NoCache> {
/// Creates a new reader from a specified source and validates the provided `PMTiles` archive is valid.
///
/// Note: Prefer using new_with_* methods.
/// Note: Prefer using `new_with_*` methods.
pub async fn try_from_source(backend: B) -> Result<Self, PmtError> {
Self::try_from_cached_source(backend, NoCache).await
}
}

impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTilesReader<B, C> {
/// Creates a new cached reader from a specified source and validates the provided `PMTiles` archive is valid.
///
/// Note: Prefer using `new_with_*` methods.
pub async fn try_from_cached_source(backend: B, cache: C) -> Result<Self, PmtError> {
// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?;
if initial_bytes.len() < HEADER_SIZE {
Expand All @@ -47,6 +64,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {

Ok(Self {
backend,
cache,
header,
root_directory,
})
Expand Down Expand Up @@ -130,7 +148,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

/// Recursively locates a tile in the archive.
async fn find_tile_entry(&self, tile_id: u64) -> Option<Entry> {
async fn find_tile_entry(&self, tile_id: u64) -> Option<DirEntry> {
let entry = self.root_directory.find_tile_id(tile_id);
if let Some(entry) = entry {
if entry.is_leaf() {
Expand All @@ -141,15 +159,25 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

#[async_recursion]
async fn find_entry_rec(&self, tile_id: u64, entry: &Entry, depth: u8) -> Option<Entry> {
async fn find_entry_rec(&self, tile_id: u64, entry: &DirEntry, depth: u8) -> Option<DirEntry> {
// the recursion is done as two functions because it is a bit cleaner,
// and it allows directory to be cached later without cloning it first.
let offset = (self.header.leaf_offset + entry.offset) as _;
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id);

if let Some(entry) = entry {
let entry = match self.cache.get_dir_entry(offset, tile_id).await {
DirCacheResult::NotCached => {
// Cache miss - read from backend
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id).cloned();
self.cache.insert_dir(offset, dir).await;
entry
}
DirCacheResult::NotFound => None,
DirCacheResult::Found(entry) => Some(entry),
};

if let Some(ref entry) = entry {
if entry.is_leaf() {
return if depth <= 4 {
self.find_entry_rec(tile_id, entry, depth + 1).await
Expand All @@ -159,7 +187,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}
}

entry.cloned()
entry
}

async fn read_directory(&self, offset: usize, length: usize) -> Result<Directory, PmtError> {
Expand Down Expand Up @@ -191,26 +219,50 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

#[cfg(feature = "http-async")]
impl AsyncPmTilesReader<HttpBackend> {
/// Creates a new PMTiles reader from a URL using the Reqwest backend.
impl AsyncPmTilesReader<HttpBackend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> Result<Self, PmtError> {
Self::new_with_cached_url(NoCache, client, url).await
}
}

#[cfg(feature = "http-async")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_url<U: IntoUrl>(
cache: C,
client: Client,
url: U,
) -> Result<Self, PmtError> {
let backend = HttpBackend::try_from(client, url)?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl AsyncPmTilesReader<MmapBackend> {
/// Creates a new PMTiles reader from a file path using the async mmap backend.
impl AsyncPmTilesReader<MmapBackend, NoCache> {
/// Creates a new `PMTiles` reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> Result<Self, PmtError> {
Self::new_with_cached_path(NoCache, path).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
/// Creates a new cached `PMTiles` reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_cached_path<P: AsRef<Path>>(cache: C, path: P) -> Result<Self, PmtError> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

Expand Down
65 changes: 65 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;

use crate::directory::{DirEntry, Directory};

pub enum DirCacheResult {
NotCached,
NotFound,
Found(DirEntry),
}

impl From<Option<&DirEntry>> for DirCacheResult {
fn from(entry: Option<&DirEntry>) -> Self {
match entry {
Some(entry) => DirCacheResult::Found(entry.clone()),
None => DirCacheResult::NotFound,
}
}
}

/// A cache for PMTiles directories.
#[async_trait]
pub trait DirectoryCache {
/// Get a directory from the cache, using the offset as a key.
async fn get_dir_entry(&self, offset: usize, tile_id: u64) -> DirCacheResult;

/// Insert a directory into the cache, using the offset as a key.
/// Note that cache must be internally mutable.
async fn insert_dir(&self, offset: usize, directory: Directory);
}

pub struct NoCache;

#[async_trait]
impl DirectoryCache for NoCache {
#[inline]
async fn get_dir_entry(&self, _offset: usize, _tile_id: u64) -> DirCacheResult {
DirCacheResult::NotCached
}

#[inline]
async fn insert_dir(&self, _offset: usize, _directory: Directory) {}
}

/// A simple HashMap-based implementation of a `PMTiles` directory cache.
#[derive(Default)]
pub struct HashMapCache {
pub cache: Arc<RwLock<HashMap<usize, Directory>>>,
}

#[async_trait]
impl DirectoryCache for HashMapCache {
async fn get_dir_entry(&self, offset: usize, tile_id: u64) -> DirCacheResult {
if let Some(dir) = self.cache.read().unwrap().get(&offset) {
return dir.find_tile_id(tile_id).into();
}
DirCacheResult::NotCached
}

async fn insert_dir(&self, offset: usize, directory: Directory) {
self.cache.write().unwrap().insert(offset, directory);
}
}
34 changes: 18 additions & 16 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use varint_rs::VarintReader;

use crate::error::PmtError;

pub(crate) struct Directory {
entries: Vec<Entry>,
#[derive(Clone)]
pub struct Directory {
entries: Vec<DirEntry>,
}

impl Debug for Directory {
Expand All @@ -17,7 +18,8 @@ impl Debug for Directory {

impl Directory {
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub fn find_tile_id(&self, tile_id: u64) -> Option<&Entry> {
#[must_use]
pub fn find_tile_id(&self, tile_id: u64) -> Option<&DirEntry> {
match self.entries.binary_search_by(|e| e.tile_id.cmp(&tile_id)) {
Ok(idx) => self.entries.get(idx),
Err(next_id) => {
Expand All @@ -26,7 +28,7 @@ impl Directory {
if next_id > 0 {
let previous_tile = self.entries.get(next_id - 1)?;
if previous_tile.is_leaf()
|| tile_id - previous_tile.tile_id < previous_tile.run_length as u64
|| tile_id - previous_tile.tile_id < u64::from(previous_tile.run_length)
{
return Some(previous_tile);
}
Expand All @@ -44,32 +46,32 @@ impl TryFrom<Bytes> for Directory {
let mut buffer = buffer.reader();
let n_entries = buffer.read_usize_varint()?;

let mut entries = vec![Entry::default(); n_entries];
let mut entries = vec![DirEntry::default(); n_entries];

// Read tile IDs
let mut next_tile_id = 0;
for entry in entries.iter_mut() {
for entry in &mut entries {
next_tile_id += buffer.read_u64_varint()?;
entry.tile_id = next_tile_id;
}

// Read Run Lengths
for entry in entries.iter_mut() {
for entry in &mut entries {
entry.run_length = buffer.read_u32_varint()?;
}

// Read Lengths
for entry in entries.iter_mut() {
for entry in &mut entries {
entry.length = buffer.read_u32_varint()?;
}

// Read Offsets
let mut last_entry: Option<&Entry> = None;
for entry in entries.iter_mut() {
let mut last_entry: Option<&DirEntry> = None;
for entry in &mut entries {
let offset = buffer.read_u64_varint()?;
entry.offset = if offset == 0 {
let e = last_entry.ok_or(PmtError::InvalidEntry)?;
e.offset + e.length as u64
e.offset + u64::from(e.length)
} else {
offset - 1
};
Expand All @@ -81,16 +83,16 @@ impl TryFrom<Bytes> for Directory {
}

#[derive(Clone, Default, Debug)]
pub(crate) struct Entry {
pub struct DirEntry {
pub(crate) tile_id: u64,
pub(crate) offset: u64,
pub(crate) length: u32,
pub(crate) run_length: u32,
}

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl Entry {
pub fn is_leaf(&self) -> bool {
impl DirEntry {
pub(crate) fn is_leaf(&self) -> bool {
self.run_length == 0
}
}
Expand All @@ -115,7 +117,7 @@ mod tests {
reader.read_exact(header_bytes.as_mut()).unwrap();

let header = Header::try_from_bytes(header_bytes.freeze()).unwrap();
let mut directory_bytes = BytesMut::zeroed(header.root_length as usize);
let mut directory_bytes = BytesMut::zeroed(usize::try_from(header.root_length).unwrap());
reader.read_exact(directory_bytes.as_mut()).unwrap();

let mut decompressed = BytesMut::zeroed(directory_bytes.len() * 2);
Expand All @@ -135,7 +137,7 @@ mod tests {
// ...it breaks pattern on the 59th tile
assert_eq!(directory.entries[58].tile_id, 58);
assert_eq!(directory.entries[58].run_length, 2);
assert_eq!(directory.entries[58].offset, 422070);
assert_eq!(directory.entries[58].offset, 422_070);
assert_eq!(directory.entries[58].length, 850);
}
}
Loading

0 comments on commit 180e7fb

Please sign in to comment.