From 5078e8ecb18671a432afbb2d427f321b757da33c Mon Sep 17 00:00:00 2001 From: Pirmin Kalberer Date: Sat, 19 Oct 2024 15:28:21 +0200 Subject: [PATCH] Add basic write support --- Cargo.toml | 7 +- justfile | 3 + src/async_reader.rs | 7 ++ src/directory.rs | 76 ++++++++++++-- src/header.rs | 77 +++++++++++++- src/lib.rs | 2 + src/writer.rs | 244 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 403 insertions(+), 13 deletions(-) create mode 100644 src/writer.rs diff --git a/Cargo.toml b/Cargo.toml index babc2d0..5205246 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ rust-version = "1.81.0" categories = ["science::geo"] [features] -default = [] +default = ["__writer"] http-async = ["__async", "dep:reqwest"] mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"] s3-async-native = ["__async-s3", "__async-s3-nativetls"] @@ -32,12 +32,15 @@ __async-s3 = ["__async", "dep:rust-s3"] __async-s3-nativetls = ["rust-s3?/use-tokio-native-tls"] __async-s3-rustls = ["rust-s3?/tokio-rustls-tls"] __async-aws-s3 = ["__async", "dep:aws-sdk-s3"] +__writer = [] [dependencies] # TODO: determine how we want to handle compression in async & sync environments +ahash = { version = "0.8.11", default-features = false, features = ["std","no-rng"] } aws-sdk-s3 = { version = "1.49.0", optional = true } async-compression = { version = "0.4", features = ["gzip"] } bytes = "1" +flate2 = "1" fmmap = { version = "0.3", default-features = false, optional = true } hilbert_2d = "1" reqwest = { version = "0.12.4", default-features = false, optional = true } @@ -50,9 +53,9 @@ tokio = { version = "1", default-features = false, features = ["io-util"], optio varint-rs = "2" [dev-dependencies] -flate2 = "1" fmmap = { version = "0.3", features = ["tokio-async"] } reqwest = { version = "0.12.4", features = ["rustls-tls-webpki-roots"] } +tempfile = "3.13.0" tokio = { version = "1", features = ["test-util", "macros", "rt"] } [package.metadata.docs.rs] diff --git a/justfile b/justfile index 1038eb0..3402fe7 100644 --- a/justfile +++ b/justfile @@ -21,6 +21,9 @@ test: cargo test RUSTDOCFLAGS="-D warnings" cargo doc --no-deps +test_writer: + cargo test --features mmap-async-tokio + # Run all tests and checks test-all: check fmt clippy diff --git a/src/async_reader.rs b/src/async_reader.rs index 12c48c0..d68158c 100644 --- a/src/async_reader.rs +++ b/src/async_reader.rs @@ -65,6 +65,10 @@ impl AsyncPmTile /// Fetches tile bytes from the archive. pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> PmtResult> { let tile_id = tile_id(z, x, y); + self.get_tile_by_id(tile_id).await + } + + pub(crate) async fn get_tile_by_id(&self, tile_id: u64) -> PmtResult> { let Some(entry) = self.find_tile_entry(tile_id).await? else { return Ok(None); }; @@ -206,6 +210,9 @@ impl AsyncPmTile .read_to_end(&mut decompressed_bytes) .await?; } + Compression::None => { + return Ok(bytes); + } v => Err(UnsupportedCompression(v))?, } diff --git a/src/directory.rs b/src/directory.rs index a839a91..f7695be 100644 --- a/src/directory.rs +++ b/src/directory.rs @@ -1,11 +1,12 @@ use std::fmt::{Debug, Formatter}; +use std::{io, io::Write}; use bytes::{Buf, Bytes}; -use varint_rs::VarintReader; +use varint_rs::{VarintReader, VarintWriter}; use crate::error::PmtError; -#[derive(Clone)] +#[derive(Default, Clone)] pub struct Directory { entries: Vec, } @@ -43,6 +44,10 @@ impl Directory { pub fn get_approx_byte_size(&self) -> usize { self.entries.capacity() * size_of::() } + + pub(crate) fn push(&mut self, entry: DirEntry) { + self.entries.push(entry); + } } impl TryFrom for Directory { @@ -88,6 +93,44 @@ impl TryFrom for Directory { } } +impl Directory { + pub fn write_to(&self, writer: &mut W) -> io::Result<()> { + // Write number of entries + writer.write_usize_varint(self.entries.len())?; + + // Write tile IDs + let mut last_tile_id = 0; + for entry in &self.entries { + writer.write_u64_varint(entry.tile_id - last_tile_id)?; + last_tile_id = entry.tile_id; + } + + // Write Run Lengths + for entry in &self.entries { + writer.write_u32_varint(entry.run_length)?; + } + + // Write Lengths + for entry in &self.entries { + writer.write_u32_varint(entry.length)?; + } + + // Write Offsets + let mut last_offset = 0; + for entry in &self.entries { + let offset_to_write = if entry.offset == last_offset + u64::from(entry.length) { + 0 + } else { + entry.offset + 1 + }; + writer.write_u64_varint(offset_to_write)?; + last_offset = entry.offset; + } + + Ok(()) + } +} + #[derive(Clone, Default, Debug)] pub struct DirEntry { pub(crate) tile_id: u64, @@ -106,16 +149,15 @@ impl DirEntry { mod tests { use std::io::{BufReader, Read, Write}; - use bytes::BytesMut; + use bytes::{Bytes, BytesMut}; use super::Directory; use crate::header::HEADER_SIZE; use crate::tests::RASTER_FILE; use crate::Header; - #[test] - fn read_root_directory() { - let test_file = std::fs::File::open(RASTER_FILE).unwrap(); + fn read_root_directory(fname: &str) -> Directory { + let test_file = std::fs::File::open(fname).unwrap(); let mut reader = BufReader::new(test_file); let mut header_bytes = BytesMut::zeroed(HEADER_SIZE); @@ -131,8 +173,12 @@ mod tests { gunzip.write_all(&directory_bytes).unwrap(); } - let directory = Directory::try_from(decompressed.freeze()).unwrap(); + Directory::try_from(decompressed.freeze()).unwrap() + } + #[test] + fn root_directory() { + let directory = read_root_directory(RASTER_FILE); assert_eq!(directory.entries.len(), 84); // Note: this is not true for all tiles, just the first few... for nth in 0..10 { @@ -145,4 +191,20 @@ mod tests { assert_eq!(directory.entries[58].offset, 422_070); assert_eq!(directory.entries[58].length, 850); } + + #[test] + fn write_directory() { + let root_dir = read_root_directory(RASTER_FILE); + let mut buf = vec![]; + root_dir.write_to(&mut buf).unwrap(); + let dir = Directory::try_from(Bytes::from(buf)).unwrap(); + assert!(root_dir + .entries + .iter() + .enumerate() + .all(|(idx, entry)| dir.entries[idx].tile_id == entry.tile_id + && dir.entries[idx].run_length == entry.run_length + && dir.entries[idx].offset == entry.offset + && dir.entries[idx].length == entry.length)); + } } diff --git a/src/header.rs b/src/header.rs index f51aed3..d47863e 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,13 +1,14 @@ -use std::num::NonZeroU64; +use std::num::{NonZero, NonZeroU64}; use std::panic::catch_unwind; +use std::{io, io::Write}; use bytes::{Buf, Bytes}; use crate::error::{PmtError, PmtResult}; -#[cfg(feature = "__async")] +#[cfg(any(feature = "__async", feature = "__writer"))] pub(crate) const MAX_INITIAL_BYTES: usize = 16_384; -#[cfg(any(test, feature = "__async"))] +#[cfg(any(test, feature = "__async", feature = "__writer"))] pub(crate) const HEADER_SIZE: usize = 127; #[allow(dead_code)] @@ -144,7 +145,7 @@ impl TryInto for u8 { } } -static V3_MAGIC: &str = "PMTiles"; +pub(crate) static V3_MAGIC: &str = "PMTiles"; static V2_MAGIC: &str = "PM"; impl Header { @@ -200,6 +201,47 @@ impl Header { } } +impl Header { + pub fn write_to(&self, writer: &mut W) -> io::Result<()> { + // Write magic number + writer.write_all(V3_MAGIC.as_bytes())?; + + // Write header fields + writer.write_all(&[self.version])?; + writer.write_all(&self.root_offset.to_le_bytes())?; + writer.write_all(&self.root_length.to_le_bytes())?; + writer.write_all(&self.metadata_offset.to_le_bytes())?; + writer.write_all(&self.metadata_length.to_le_bytes())?; + writer.write_all(&self.leaf_offset.to_le_bytes())?; + writer.write_all(&self.leaf_length.to_le_bytes())?; + writer.write_all(&self.data_offset.to_le_bytes())?; + writer.write_all(&self.data_length.to_le_bytes())?; + writer.write_all(&self.n_addressed_tiles.map_or(0, NonZero::get).to_le_bytes())?; + writer.write_all(&self.n_tile_entries.map_or(0, NonZero::get).to_le_bytes())?; + writer.write_all(&self.n_tile_contents.map_or(0, NonZero::get).to_le_bytes())?; + writer.write_all(&[u8::from(self.clustered)])?; + writer.write_all(&[self.internal_compression as u8])?; + writer.write_all(&[self.tile_compression as u8])?; + writer.write_all(&[self.tile_type as u8])?; + writer.write_all(&[self.min_zoom])?; + writer.write_all(&[self.max_zoom])?; + Self::write_coordinate_part(writer, self.min_longitude)?; + Self::write_coordinate_part(writer, self.min_latitude)?; + Self::write_coordinate_part(writer, self.max_longitude)?; + Self::write_coordinate_part(writer, self.max_latitude)?; + writer.write_all(&[self.center_zoom])?; + Self::write_coordinate_part(writer, self.center_longitude)?; + Self::write_coordinate_part(writer, self.center_latitude)?; + + Ok(()) + } + + #[allow(clippy::cast_possible_truncation)] + fn write_coordinate_part(writer: &mut W, value: f32) -> io::Result<()> { + writer.write_all(&((value * 10_000_000.0) as i32).to_le_bytes()) + } +} + #[cfg(test)] mod tests { #![allow(clippy::unreadable_literal, clippy::float_cmp)] @@ -303,4 +345,31 @@ mod tests { )) ); } + + #[test] + fn write_header() { + let mut test = File::open(RASTER_FILE).unwrap(); + let mut header_bytes = [0; HEADER_SIZE]; + test.read_exact(header_bytes.as_mut_slice()).unwrap(); + let header = Header::try_from_bytes(Bytes::copy_from_slice(&header_bytes)).unwrap(); + + let mut buf = vec![]; + header.write_to(&mut buf).unwrap(); + let out = Header::try_from_bytes(Bytes::from(buf)).unwrap(); + assert_eq!(header.version, out.version); + assert_eq!(header.tile_type, out.tile_type); + assert_eq!(header.n_addressed_tiles, out.n_addressed_tiles); + assert_eq!(header.n_tile_entries, out.n_tile_entries); + assert_eq!(header.n_tile_contents, out.n_tile_contents); + assert_eq!(header.min_zoom, out.min_zoom); + assert_eq!(header.max_zoom, out.max_zoom); + assert_eq!(header.center_zoom, out.center_zoom); + assert_eq!(header.center_latitude, out.center_latitude); + assert_eq!(header.center_longitude, out.center_longitude); + assert_eq!(header.min_latitude, out.min_latitude); + assert_eq!(header.max_latitude, out.max_latitude); + assert_eq!(header.min_longitude, out.min_longitude); + assert_eq!(header.max_longitude, out.max_longitude); + assert_eq!(header.clustered, out.clustered); + } } diff --git a/src/lib.rs b/src/lib.rs index 855245d..cdf7485 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,8 @@ mod error; mod header; #[cfg(feature = "__async")] mod tile; +#[cfg(feature = "__writer")] +pub mod writer; #[cfg(feature = "aws-s3-async")] pub use backend_aws_s3::AwsS3Backend; diff --git a/src/writer.rs b/src/writer.rs new file mode 100644 index 0000000..001683a --- /dev/null +++ b/src/writer.rs @@ -0,0 +1,244 @@ +use std::fs::File; +use std::hash::{Hash, Hasher}; +use std::io::{self, BufWriter, Seek, Write}; + +use ahash::AHasher; +use flate2::write::GzEncoder; + +use crate::directory::{DirEntry, Directory}; +use crate::error::PmtResult; +use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES}; +use crate::{Compression, Header, TileType}; + +pub struct PmTilesWriter { + out: BufWriter, + header: Header, + entries: Vec, + n_addressed_tiles: u64, + last_tile_hash: u64, +} + +impl PmTilesWriter { + pub fn create(name: &str, tile_type: TileType, metadata: &str) -> PmtResult { + let file = File::create(name)?; + let mut out = BufWriter::new(file); + + // We use the following layout: + // +--------+----------------+----------+-----------+------------------+ + // | | | | | | + // | Header | Root Directory | Metadata | Tile Data | Leaf Directories | + // | | | | | | + // +--------+----------------+----------+-----------+------------------+ + // This allows writing without temporary files. But it requires Seek support. + + // Reserve space for header and root directory + out.write_all(&[0u8; MAX_INITIAL_BYTES])?; + + // let metadata_length = metadata.len() as u64; + // out.write_all(metadata.as_bytes())?; + let mut metadata_buf = vec![]; + { + let mut encoder = GzEncoder::new(&mut metadata_buf, flate2::Compression::default()); + encoder.write_all(metadata.as_bytes())?; + } + let metadata_length = metadata_buf.len() as u64; + out.write_all(&metadata_buf)?; + + let header = Header { + version: 3, + root_offset: HEADER_SIZE as u64, + root_length: 0, + metadata_offset: MAX_INITIAL_BYTES as u64, + metadata_length, + leaf_offset: 0, + leaf_length: 0, + data_offset: MAX_INITIAL_BYTES as u64 + metadata_length, + data_length: 0, + n_addressed_tiles: None, + n_tile_entries: None, + n_tile_contents: None, + clustered: true, + internal_compression: Compression::Gzip, // pmtiles extract does not support None compression + tile_compression: Compression::None, + tile_type, + min_zoom: 0, + max_zoom: 22, + min_longitude: -180.0, + min_latitude: -85.0, + max_longitude: 180.0, + max_latitude: 85.0, + center_zoom: 0, + center_longitude: 0.0, + center_latitude: 0.0, + }; + + Ok(Self { + out, + header, + entries: Vec::new(), + n_addressed_tiles: 0, + last_tile_hash: 0, + }) + } + + fn calculate_hash(value: &impl Hash) -> u64 { + let mut hasher = AHasher::default(); + value.hash(&mut hasher); + hasher.finish() + } + + /// Add tile to writer + /// Tiles are deduplicated and written to output. + /// `tile_id` should be increasing. + #[allow(clippy::missing_panics_doc)] + pub fn add_tile(&mut self, tile_id: u64, data: &[u8]) -> PmtResult<()> { + if data.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "A tile must have at least 1 byte of data.", + ) + .into()); + } + + let is_first = self.entries.is_empty(); + let mut first_entry = DirEntry { + tile_id: 0, + offset: 0, + length: 0, + run_length: 0, + }; + let last_entry = self.entries.last_mut().unwrap_or(&mut first_entry); + + self.n_addressed_tiles += 1; + let hash = Self::calculate_hash(&data); + if !is_first + && hash == self.last_tile_hash + && tile_id == last_entry.tile_id + u64::from(last_entry.run_length) + { + last_entry.run_length += 1; + } else { + let offset = last_entry.offset + u64::from(last_entry.length); + // Write tile + let length = data.len().try_into().expect("TODO: check max"); + self.out.write_all(data)?; + + self.entries.push(DirEntry { + tile_id, + run_length: 1, // Will be increased if the next tile is the same + offset, + length, + }); + + self.last_tile_hash = hash; + } + + Ok(()) + } + + /// Build root and leaf directories from entries. + /// Leaf directories are written to output. + /// The root directory is returned. + fn build_directories(&self) -> Directory { + let mut root_dir = Directory::default(); + for entry in &self.entries { + root_dir.push(entry.clone()); + } + // FIXME: check max size of root directory + // TODO: Build and write optimized leaf directories + root_dir + } + + #[allow(clippy::missing_panics_doc)] + pub fn finish(mut self) -> PmtResult<()> { + if let Some(last) = self.entries.last() { + self.header.data_length = last.offset + u64::from(last.length); + self.header.leaf_offset = self.header.data_offset + self.header.data_length; + self.header.n_addressed_tiles = self.n_addressed_tiles.try_into().ok(); + self.header.n_tile_entries = (self.entries.len() as u64).try_into().ok(); + self.header.n_tile_contents = None; //TODO + } + // Write leaf directories and get root directory + let root_dir = self.build_directories(); + // Determine compressed root directory length + let mut root_dir_buf = vec![]; + { + let mut encoder = GzEncoder::new(&mut root_dir_buf, flate2::Compression::default()); + root_dir.write_to(&mut encoder)?; + } + self.header.root_length = root_dir_buf.len() as u64; + + // Write header and root directory + self.out.rewind()?; + self.header.write_to(&mut self.out)?; + self.out.write_all(&root_dir_buf)?; + self.out.flush()?; + + Ok(()) + } +} + +#[cfg(test)] +#[cfg(feature = "mmap-async-tokio")] +mod tests { + use super::PmTilesWriter; + use crate::async_reader::AsyncPmTilesReader; + use crate::tests::RASTER_FILE; + use crate::MmapBackend; + use tempfile::NamedTempFile; + + fn get_temp_file_path(suffix: &str) -> std::io::Result { + let temp_file = NamedTempFile::with_suffix(suffix)?; + Ok(temp_file.path().to_string_lossy().into_owned()) + } + + #[tokio::test] + #[allow(clippy::float_cmp)] + async fn roundtrip_raster() { + let backend = MmapBackend::try_from(RASTER_FILE).await.unwrap(); + let tiles_in = AsyncPmTilesReader::try_from_source(backend).await.unwrap(); + let header_in = tiles_in.get_header(); + let metadata_in = tiles_in.get_metadata().await.unwrap(); + let num_tiles = header_in.n_addressed_tiles.unwrap(); + + let fname = get_temp_file_path("pmtiles").unwrap(); + // let fname = "test.pmtiles".to_string(); + let mut writer = PmTilesWriter::create(&fname, header_in.tile_type, &metadata_in).unwrap(); + for id in 0..num_tiles.into() { + let tile = tiles_in.get_tile_by_id(id).await.unwrap().unwrap(); + writer.add_tile(id, &tile).unwrap(); + } + writer.finish().unwrap(); + + let backend = MmapBackend::try_from(&fname).await.unwrap(); + let tiles_out = AsyncPmTilesReader::try_from_source(backend).await.unwrap(); + + // Compare headers + let header_out = tiles_out.get_header(); + // TODO: should be 3, but currently the ascii char 3, assert_eq!(header_in.version, header_out.version); + assert_eq!(header_in.tile_type, header_out.tile_type); + assert_eq!(header_in.n_addressed_tiles, header_out.n_addressed_tiles); + assert_eq!(header_in.n_tile_entries, header_out.n_tile_entries); + // TODO: assert_eq!(header_in.n_tile_contents, header_out.n_tile_contents); + assert_eq!(header_in.min_zoom, header_out.min_zoom); + // TODO: assert_eq!(header_in.max_zoom, header_out.max_zoom); + assert_eq!(header_in.center_zoom, header_out.center_zoom); + assert_eq!(header_in.center_latitude, header_out.center_latitude); + assert_eq!(header_in.center_longitude, header_out.center_longitude); + assert_eq!(header_in.min_latitude, header_out.min_latitude); + assert_eq!(header_in.max_latitude, header_out.max_latitude); + assert_eq!(header_in.min_longitude, header_out.min_longitude); + assert_eq!(header_in.max_longitude, header_out.max_longitude); + assert_eq!(header_in.clustered, header_out.clustered); + + // Compare metadata + let metadata_out = tiles_out.get_metadata().await.unwrap(); + assert_eq!(metadata_in, metadata_out); + + // Compare tiles + for (z, x, y) in [(0, 0, 0), (2, 2, 2), (3, 4, 5)] { + let tile_in = tiles_in.get_tile(z, x, y).await.unwrap().unwrap(); + let tile_out = tiles_out.get_tile(z, x, y).await.unwrap().unwrap(); + assert_eq!(tile_in.len(), tile_out.len()); + } + } +}