Skip to content

Commit

Permalink
Add basic write support
Browse files Browse the repository at this point in the history
  • Loading branch information
pka committed Oct 19, 2024
1 parent 87c30f1 commit 5078e8e
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 13 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rust-version = "1.81.0"
categories = ["science::geo"]

[features]
default = []
default = ["__writer"]
http-async = ["__async", "dep:reqwest"]
mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"]
s3-async-native = ["__async-s3", "__async-s3-nativetls"]
Expand All @@ -32,12 +32,15 @@ __async-s3 = ["__async", "dep:rust-s3"]
__async-s3-nativetls = ["rust-s3?/use-tokio-native-tls"]
__async-s3-rustls = ["rust-s3?/tokio-rustls-tls"]
__async-aws-s3 = ["__async", "dep:aws-sdk-s3"]
__writer = []

[dependencies]
# TODO: determine how we want to handle compression in async & sync environments
ahash = { version = "0.8.11", default-features = false, features = ["std","no-rng"] }
aws-sdk-s3 = { version = "1.49.0", optional = true }
async-compression = { version = "0.4", features = ["gzip"] }
bytes = "1"
flate2 = "1"
fmmap = { version = "0.3", default-features = false, optional = true }
hilbert_2d = "1"
reqwest = { version = "0.12.4", default-features = false, optional = true }
Expand All @@ -50,9 +53,9 @@ tokio = { version = "1", default-features = false, features = ["io-util"], optio
varint-rs = "2"

[dev-dependencies]
flate2 = "1"
fmmap = { version = "0.3", features = ["tokio-async"] }
reqwest = { version = "0.12.4", features = ["rustls-tls-webpki-roots"] }
tempfile = "3.13.0"
tokio = { version = "1", features = ["test-util", "macros", "rt"] }

[package.metadata.docs.rs]
Expand Down
3 changes: 3 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ test:
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

test_writer:
cargo test --features mmap-async-tokio

# Run all tests and checks
test-all: check fmt clippy

Expand Down
7 changes: 7 additions & 0 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTile
/// Fetches tile bytes from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> PmtResult<Option<Bytes>> {
let tile_id = tile_id(z, x, y);
self.get_tile_by_id(tile_id).await
}

pub(crate) async fn get_tile_by_id(&self, tile_id: u64) -> PmtResult<Option<Bytes>> {
let Some(entry) = self.find_tile_entry(tile_id).await? else {
return Ok(None);
};
Expand Down Expand Up @@ -206,6 +210,9 @@ impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTile
.read_to_end(&mut decompressed_bytes)
.await?;
}
Compression::None => {
return Ok(bytes);
}
v => Err(UnsupportedCompression(v))?,
}

Expand Down
76 changes: 69 additions & 7 deletions src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::fmt::{Debug, Formatter};
use std::{io, io::Write};

use bytes::{Buf, Bytes};
use varint_rs::VarintReader;
use varint_rs::{VarintReader, VarintWriter};

use crate::error::PmtError;

#[derive(Clone)]
#[derive(Default, Clone)]
pub struct Directory {
entries: Vec<DirEntry>,
}
Expand Down Expand Up @@ -43,6 +44,10 @@ impl Directory {
pub fn get_approx_byte_size(&self) -> usize {
self.entries.capacity() * size_of::<DirEntry>()
}

pub(crate) fn push(&mut self, entry: DirEntry) {
self.entries.push(entry);
}
}

impl TryFrom<Bytes> for Directory {
Expand Down Expand Up @@ -88,6 +93,44 @@ impl TryFrom<Bytes> for Directory {
}
}

impl Directory {
pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// Write number of entries
writer.write_usize_varint(self.entries.len())?;

// Write tile IDs
let mut last_tile_id = 0;
for entry in &self.entries {
writer.write_u64_varint(entry.tile_id - last_tile_id)?;
last_tile_id = entry.tile_id;
}

// Write Run Lengths
for entry in &self.entries {
writer.write_u32_varint(entry.run_length)?;
}

// Write Lengths
for entry in &self.entries {
writer.write_u32_varint(entry.length)?;
}

// Write Offsets
let mut last_offset = 0;
for entry in &self.entries {
let offset_to_write = if entry.offset == last_offset + u64::from(entry.length) {
0
} else {
entry.offset + 1
};
writer.write_u64_varint(offset_to_write)?;
last_offset = entry.offset;
}

Ok(())
}
}

#[derive(Clone, Default, Debug)]
pub struct DirEntry {
pub(crate) tile_id: u64,
Expand All @@ -106,16 +149,15 @@ impl DirEntry {
mod tests {
use std::io::{BufReader, Read, Write};

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};

use super::Directory;
use crate::header::HEADER_SIZE;
use crate::tests::RASTER_FILE;
use crate::Header;

#[test]
fn read_root_directory() {
let test_file = std::fs::File::open(RASTER_FILE).unwrap();
fn read_root_directory(fname: &str) -> Directory {
let test_file = std::fs::File::open(fname).unwrap();
let mut reader = BufReader::new(test_file);

let mut header_bytes = BytesMut::zeroed(HEADER_SIZE);
Expand All @@ -131,8 +173,12 @@ mod tests {
gunzip.write_all(&directory_bytes).unwrap();
}

let directory = Directory::try_from(decompressed.freeze()).unwrap();
Directory::try_from(decompressed.freeze()).unwrap()
}

#[test]
fn root_directory() {
let directory = read_root_directory(RASTER_FILE);
assert_eq!(directory.entries.len(), 84);
// Note: this is not true for all tiles, just the first few...
for nth in 0..10 {
Expand All @@ -145,4 +191,20 @@ mod tests {
assert_eq!(directory.entries[58].offset, 422_070);
assert_eq!(directory.entries[58].length, 850);
}

#[test]
fn write_directory() {
let root_dir = read_root_directory(RASTER_FILE);
let mut buf = vec![];
root_dir.write_to(&mut buf).unwrap();
let dir = Directory::try_from(Bytes::from(buf)).unwrap();
assert!(root_dir
.entries
.iter()
.enumerate()
.all(|(idx, entry)| dir.entries[idx].tile_id == entry.tile_id
&& dir.entries[idx].run_length == entry.run_length
&& dir.entries[idx].offset == entry.offset
&& dir.entries[idx].length == entry.length));
}
}
77 changes: 73 additions & 4 deletions src/header.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::num::NonZeroU64;
use std::num::{NonZero, NonZeroU64};
use std::panic::catch_unwind;
use std::{io, io::Write};

use bytes::{Buf, Bytes};

use crate::error::{PmtError, PmtResult};

#[cfg(feature = "__async")]
#[cfg(any(feature = "__async", feature = "__writer"))]
pub(crate) const MAX_INITIAL_BYTES: usize = 16_384;
#[cfg(any(test, feature = "__async"))]
#[cfg(any(test, feature = "__async", feature = "__writer"))]
pub(crate) const HEADER_SIZE: usize = 127;

#[allow(dead_code)]
Expand Down Expand Up @@ -144,7 +145,7 @@ impl TryInto<TileType> for u8 {
}
}

static V3_MAGIC: &str = "PMTiles";
pub(crate) static V3_MAGIC: &str = "PMTiles";
static V2_MAGIC: &str = "PM";

impl Header {
Expand Down Expand Up @@ -200,6 +201,47 @@ impl Header {
}
}

impl Header {
pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> {
// Write magic number
writer.write_all(V3_MAGIC.as_bytes())?;

// Write header fields
writer.write_all(&[self.version])?;
writer.write_all(&self.root_offset.to_le_bytes())?;
writer.write_all(&self.root_length.to_le_bytes())?;
writer.write_all(&self.metadata_offset.to_le_bytes())?;
writer.write_all(&self.metadata_length.to_le_bytes())?;
writer.write_all(&self.leaf_offset.to_le_bytes())?;
writer.write_all(&self.leaf_length.to_le_bytes())?;
writer.write_all(&self.data_offset.to_le_bytes())?;
writer.write_all(&self.data_length.to_le_bytes())?;
writer.write_all(&self.n_addressed_tiles.map_or(0, NonZero::get).to_le_bytes())?;
writer.write_all(&self.n_tile_entries.map_or(0, NonZero::get).to_le_bytes())?;
writer.write_all(&self.n_tile_contents.map_or(0, NonZero::get).to_le_bytes())?;
writer.write_all(&[u8::from(self.clustered)])?;
writer.write_all(&[self.internal_compression as u8])?;
writer.write_all(&[self.tile_compression as u8])?;
writer.write_all(&[self.tile_type as u8])?;
writer.write_all(&[self.min_zoom])?;
writer.write_all(&[self.max_zoom])?;
Self::write_coordinate_part(writer, self.min_longitude)?;
Self::write_coordinate_part(writer, self.min_latitude)?;
Self::write_coordinate_part(writer, self.max_longitude)?;
Self::write_coordinate_part(writer, self.max_latitude)?;
writer.write_all(&[self.center_zoom])?;
Self::write_coordinate_part(writer, self.center_longitude)?;
Self::write_coordinate_part(writer, self.center_latitude)?;

Ok(())
}

#[allow(clippy::cast_possible_truncation)]
fn write_coordinate_part<W: Write>(writer: &mut W, value: f32) -> io::Result<()> {
writer.write_all(&((value * 10_000_000.0) as i32).to_le_bytes())
}
}

#[cfg(test)]
mod tests {
#![allow(clippy::unreadable_literal, clippy::float_cmp)]
Expand Down Expand Up @@ -303,4 +345,31 @@ mod tests {
))
);
}

#[test]
fn write_header() {
let mut test = File::open(RASTER_FILE).unwrap();
let mut header_bytes = [0; HEADER_SIZE];
test.read_exact(header_bytes.as_mut_slice()).unwrap();
let header = Header::try_from_bytes(Bytes::copy_from_slice(&header_bytes)).unwrap();

let mut buf = vec![];
header.write_to(&mut buf).unwrap();
let out = Header::try_from_bytes(Bytes::from(buf)).unwrap();
assert_eq!(header.version, out.version);
assert_eq!(header.tile_type, out.tile_type);
assert_eq!(header.n_addressed_tiles, out.n_addressed_tiles);
assert_eq!(header.n_tile_entries, out.n_tile_entries);
assert_eq!(header.n_tile_contents, out.n_tile_contents);
assert_eq!(header.min_zoom, out.min_zoom);
assert_eq!(header.max_zoom, out.max_zoom);
assert_eq!(header.center_zoom, out.center_zoom);
assert_eq!(header.center_latitude, out.center_latitude);
assert_eq!(header.center_longitude, out.center_longitude);
assert_eq!(header.min_latitude, out.min_latitude);
assert_eq!(header.max_latitude, out.max_latitude);
assert_eq!(header.min_longitude, out.min_longitude);
assert_eq!(header.max_longitude, out.max_longitude);
assert_eq!(header.clustered, out.clustered);
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod error;
mod header;
#[cfg(feature = "__async")]
mod tile;
#[cfg(feature = "__writer")]
pub mod writer;

#[cfg(feature = "aws-s3-async")]
pub use backend_aws_s3::AwsS3Backend;
Expand Down
Loading

0 comments on commit 5078e8e

Please sign in to comment.