Skip to content

Commit

Permalink
[wip] add hashmap-based directory cache
Browse files Browse the repository at this point in the history
this is a first pass of the caching
  • Loading branch information
nyurik committed Nov 8, 2023
1 parent 2cb2cfe commit b6c853a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ default = []
http-async = ["dep:tokio", "dep:reqwest"]
mmap-async-tokio = ["dep:tokio", "dep:fmmap", "fmmap?/tokio-async"]
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]
cache = []

# TODO: support other async libraries

Expand Down
74 changes: 62 additions & 12 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use reqwest::{Client, IntoUrl};
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use tokio::io::AsyncReadExt;

use crate::cache::SearchResult;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
use crate::cache::{Cache, NoCache};
use crate::directory::{Directory, Entry};
use crate::error::Error;
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
Expand All @@ -19,17 +22,27 @@ use crate::mmap::MmapBackend;
use crate::tile::tile_id;
use crate::{Compression, Header};

pub struct AsyncPmTilesReader<B> {
pub struct AsyncPmTilesReader<B, C> {
backend: B,
cache: C,
header: Header,
root_directory: Directory,
}

impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
/// Creates a new reader from a specified source and validates the provided PMTiles archive is valid.
impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B, NoCache> {
/// Creates a new cached reader from a specified source and validates the provided PMTiles archive is valid.
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_source(backend: B) -> Result<Self, Error> {
Self::try_from_cached_source(backend, NoCache).await
}
}

impl<B: AsyncBackend + Sync + Send, C: Cache + Sync + Send> AsyncPmTilesReader<B, C> {
/// Creates a new reader from a specified source and validates the provided PMTiles archive is valid.
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_cached_source(backend: B, cache: C) -> Result<Self, Error> {
// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?;
if initial_bytes.len() < HEADER_SIZE {
Expand All @@ -47,11 +60,14 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {

Ok(Self {
backend,
cache,
header,
root_directory,
})
}
}

impl<B: AsyncBackend + Sync + Send, C: Cache + Sync + Send> AsyncPmTilesReader<B, C> {
/// Fetches tile bytes from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option<Bytes> {
let tile_id = tile_id(z, x, y);
Expand Down Expand Up @@ -137,11 +153,21 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
// the recursion is done as two functions because it is a bit cleaner,
// and it allows directory to be cached later without cloning it first.
let offset = (self.header.leaf_offset + entry.offset) as _;
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id);

if let Some(entry) = entry {
let entry = match self.cache.get_dir_entry(offset, tile_id) {
SearchResult::NotCached => {
// Cache miss - read from backend
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id).cloned();
self.cache.insert_directory(offset, dir);
entry
}
SearchResult::NotFound => None,
SearchResult::Found(entry) => Some(entry),
};

if let Some(ref entry) = entry {
if entry.is_leaf() {
return if depth <= 4 {
self.find_entry_rec(tile_id, entry, depth + 1).await
Expand All @@ -151,7 +177,7 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}
}

entry.cloned()
entry
}

async fn read_directory(&self, offset: usize, length: usize) -> Result<Directory, Error> {
Expand Down Expand Up @@ -183,26 +209,50 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
}

#[cfg(feature = "http-async")]
impl AsyncPmTilesReader<HttpBackend> {
impl AsyncPmTilesReader<HttpBackend, NoCache> {
/// Creates a new PMTiles reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> Result<Self, Error> {
Self::new_with_cached_url(client, url, NoCache).await
}
}

#[cfg(feature = "http-async")]
impl<C: Cache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
/// Creates a new PMTiles reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_url<U: IntoUrl>(
client: Client,
url: U,
cache: C,
) -> Result<Self, Error> {
let backend = HttpBackend::try_from(client, url)?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl AsyncPmTilesReader<MmapBackend> {
impl AsyncPmTilesReader<MmapBackend, NoCache> {
/// Creates a new PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
Self::new_with_cached_path(path, NoCache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl<C: Cache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
/// Creates a new cached PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_cached_path<P: AsRef<Path>>(path: P, cache: C) -> Result<Self, Error> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_source(backend).await
Self::try_from_cached_source(backend, cache).await
}
}

Expand Down
67 changes: 67 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::directory::{Directory, Entry};

pub enum SearchResult {
NotCached,
NotFound,
Found(Entry),
}

impl From<Option<&Entry>> for SearchResult {
fn from(entry: Option<&Entry>) -> Self {
match entry {
Some(entry) => SearchResult::Found(entry.clone()),
None => SearchResult::NotFound,
}
}
}

/// A cache for PMTiles directories.
pub trait Cache {
/// Clear the cache.
fn clear(&mut self);

/// Get a directory from the cache, using the offset as a key.
fn get_dir_entry(&self, offset: usize, tile_id: u64) -> SearchResult;

/// Insert a directory into the cache, using the offset as a key.
/// Note that cache must be internally mutable.
fn insert_directory(&self, offset: usize, directory: Directory);
}

pub struct NoCache;

impl Cache for NoCache {
fn clear(&mut self) {}

#[inline]
fn get_dir_entry(&self, _offset: usize, _tile_id: u64) -> SearchResult {
SearchResult::NotCached
}

#[inline]
fn insert_directory(&self, _offset: usize, _directory: Directory) {}
}

#[cfg(feature = "cache")]
#[derive(Default)]
pub struct HashMapCache {
cache: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<usize, Directory>>>,
}

#[cfg(feature = "cache")]
impl Cache for HashMapCache {
fn clear(&mut self) {
self.cache.lock().unwrap().clear();
}

fn get_dir_entry(&self, offset: usize, tile_id: u64) -> SearchResult {
if let Some(dir) = self.cache.lock().unwrap().get(&offset) {
return dir.find_tile_id(tile_id).into();
}
SearchResult::NotCached
}

fn insert_directory(&self, offset: usize, directory: Directory) {
self.cache.lock().unwrap().insert(offset, directory);
}
}
4 changes: 2 additions & 2 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use varint_rs::VarintReader;

use crate::error::Error;

pub(crate) struct Directory {
pub struct Directory {
entries: Vec<Entry>,
}

Expand Down Expand Up @@ -81,7 +81,7 @@ impl TryFrom<Bytes> for Directory {
}

#[derive(Clone, Default, Debug)]
pub(crate) struct Entry {
pub struct Entry {
pub(crate) tile_id: u64,
pub(crate) offset: u64,
pub(crate) length: u32,
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub mod mmap;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub mod async_reader;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub mod cache;

pub mod tile;

#[cfg(test)]
Expand Down

0 comments on commit b6c853a

Please sign in to comment.