From 02d6e687c5dc55c1026ff4553344e7d18a201cfe Mon Sep 17 00:00:00 2001 From: Ellen Poe Date: Tue, 13 Feb 2024 20:46:45 -0800 Subject: [PATCH] New remote directory implementation with mmap and userfaultfd (#5) * Speed up queries for hot caches approx 10x with userfaultfd * madvise when tantivy slices our mmap * Super messy but seems to work now without huge pages * Fix to disregard the result of the uffd copy. More investigation needed. * Switch to sstable termdicts, hopefully play nice with memory pressure --- Cargo.toml | 4 + airmail/Cargo.toml | 5 +- airmail/src/directory.rs | 351 ------------------ airmail/src/directory/TANTIVY_AUTHORS | 11 + airmail/src/directory/mod.rs | 237 ++++++++++++ airmail/src/directory/query_len.rs | 48 +++ airmail/src/directory/uffd.rs | 235 ++++++++++++ airmail/src/directory/vec_writer.rs | 84 +++++ airmail/src/index.rs | 78 ++-- airmail_index/Cargo.toml | 3 + .../permute_dicts/en/street_prefixes.txt | 8 + airmail_index/src/bin/merge.rs | 23 ++ airmail_index/src/bin/query.rs | 48 +-- airmail_index/src/main.rs | 5 - airmail_index/src/openstreetmap.rs | 26 +- airmail_index/src/substitutions.rs | 24 +- airmail_service/Cargo.toml | 1 + airmail_service/src/main.rs | 69 ++-- airmail_site/src/components/Features.vue | 14 - 19 files changed, 786 insertions(+), 488 deletions(-) delete mode 100644 airmail/src/directory.rs create mode 100644 airmail/src/directory/TANTIVY_AUTHORS create mode 100644 airmail/src/directory/mod.rs create mode 100644 airmail/src/directory/query_len.rs create mode 100644 airmail/src/directory/uffd.rs create mode 100644 airmail/src/directory/vec_writer.rs create mode 100644 airmail_index/permute_dicts/en/street_prefixes.txt create mode 100644 airmail_index/src/bin/merge.rs diff --git a/Cargo.toml b/Cargo.toml index 69c510f..28db702 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,3 +3,7 @@ resolver = "2" members = [ "airmail", "airmail_common", "airmail_index", "airmail_parser", "airmail_service", "turbosm", ] + + +[profile.release] +debug = 1 \ No newline at end of file diff --git a/airmail/Cargo.toml b/airmail/Cargo.toml index d5aeb7f..c439946 100644 --- a/airmail/Cargo.toml +++ b/airmail/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] levenshtein_automata = "0.2.1" s2 = "0.0.12" -tantivy = "0.21.1" +tantivy = { version = "0.21.1", features = ["quickwit"] } tantivy-common = "0.6.0" tantivy-fst = "0.4.0" tempfile = "3.9.0" @@ -21,3 +21,6 @@ reqwest = { version = "0.11.24", features = ["blocking"] } lru = "0.12.2" async-trait = "0.1.77" tokio = { version = "1.36.0", features = ["full"] } +nix = { version = "0.27", features = ["ioctl", "mman"] } +userfaultfd = { version = "0.8.1", features = ["linux4_14", "linux5_7"] } +crossbeam = "0.8.4" diff --git a/airmail/src/directory.rs b/airmail/src/directory.rs deleted file mode 100644 index 45b8d50..0000000 --- a/airmail/src/directory.rs +++ /dev/null @@ -1,351 +0,0 @@ -use std::{ - collections::HashMap, - io::{self, Cursor, Seek, SeekFrom, Write}, - num::NonZeroUsize, - ops::Range, - path::{Path, PathBuf}, - sync::{Arc, Mutex, OnceLock}, - time::Duration, -}; - -use log::{error, info, warn}; -use lru::LruCache; -use tantivy::{ - directory::{ - error::{DeleteError, OpenReadError, OpenWriteError}, - WatchHandle, WritePtr, - }, - Directory, -}; -use tantivy_common::{file_slice::FileHandle, AntiCallToken, HasLen, OwnedBytes, TerminatingWrite}; - -thread_local! { - static BLOCKING_HTTP_CLIENT: reqwest::blocking::Client = reqwest::blocking::Client::new(); - static HTTP_CLIENT: reqwest::Client = reqwest::Client::new(); -} - -const CHUNK_SIZE: usize = 1024 * 32; - -static LRU_CACHE: OnceLock>>> = OnceLock::new(); -static LENGTHS: OnceLock>> = OnceLock::new(); - -#[derive(Debug, Clone, Hash, Eq, PartialEq)] -pub struct CacheKey { - base_url: String, - path: String, - chunk: usize, -} - -#[derive(Debug, Clone)] -pub struct HttpFileHandle { - url: String, -} - -#[async_trait::async_trait] -impl FileHandle for HttpFileHandle { - fn read_bytes(&self, range: Range) -> std::io::Result { - let chunk_start = range.start / CHUNK_SIZE; - let chunk_end = range.end / CHUNK_SIZE; - let cache = LRU_CACHE - .get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(128 * 1024).unwrap()))); - - let mut have_all_chunks = true; - for chunk in chunk_start..=chunk_end { - let key = CacheKey { - base_url: self.url.clone(), - path: self.url.clone(), - chunk, - }; - let cache = cache.lock().unwrap(); - if !cache.contains(&key) { - have_all_chunks = false; - break; - } - } - let mut accumulated_chunks = Vec::new(); - if have_all_chunks { - info!("Reading bytes from cache: {:?}", range); - let mut cache = cache.lock().unwrap(); - for chunk in chunk_start..=chunk_end { - let key = CacheKey { - base_url: self.url.clone(), - path: self.url.clone(), - chunk, - }; - accumulated_chunks.extend(cache.get(&key).unwrap()); - } - let chunk_start_offset = range.start % CHUNK_SIZE; - let chunk_end_offset = (chunk_end - chunk_start) * CHUNK_SIZE + range.end % CHUNK_SIZE; - return Ok(OwnedBytes::new( - accumulated_chunks[chunk_start_offset..chunk_end_offset].to_vec(), - )); - } - - info!( - "Reading bytes: {:?} in chunks from {} to {}", - range, chunk_start, chunk_end - ); - let start_time = std::time::Instant::now(); - let response = BLOCKING_HTTP_CLIENT.with(|client| { - client - .get(&self.url) - .timeout(Duration::from_millis( - 500 + (range.end - range.start) as u64 / 1024, - )) - .header( - "Range", - dbg!(format!( - "bytes={}-{}", - chunk_start * CHUNK_SIZE, - (chunk_end + 1) * CHUNK_SIZE - )), - ) - .send() - }); - let response = if let Err(e) = response { - error!("Error: {:?}", e); - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Error fetching chunk", - )); - } else { - response.unwrap() - }; - if response.status() != 206 { - error!("Response: {:?}", response); - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Error fetching chunk: non-200 status", - )); - } else { - let data = response.bytes().unwrap(); - let data = data.to_vec(); - { - let mut cache = cache.lock().unwrap(); - for chunk in 0..=(chunk_end - chunk_start) { - let key = CacheKey { - base_url: self.url.clone(), - path: self.url.clone(), - chunk: chunk_start + chunk, - }; - let start = chunk * CHUNK_SIZE; - let end = (chunk + 1) * CHUNK_SIZE; - let data = data[start..end.min(data.len())].to_vec(); - cache.put(key, data); - } - } - accumulated_chunks.extend(data); - } - info!( - "Fetched {} bytes in: {:?}", - accumulated_chunks.len(), - start_time.elapsed() - ); - let chunk_start_offset = range.start % CHUNK_SIZE; - let chunk_end_offset = (chunk_end - chunk_start) * CHUNK_SIZE + range.end % CHUNK_SIZE; - Ok(OwnedBytes::new( - accumulated_chunks[chunk_start_offset..chunk_end_offset.min(accumulated_chunks.len())] - .to_vec(), - )) - } -} - -impl HasLen for HttpFileHandle { - fn len(&self) -> usize { - let lengths = LENGTHS.get_or_init(|| Mutex::new(HashMap::new())); - { - let lengths = lengths.lock().unwrap(); - if let Some(length) = lengths.get(&PathBuf::from(&self.url)) { - return *length; - } - } - - let url = format!("{}", self.url); - info!("Fetching length from: {}", url); - let response = BLOCKING_HTTP_CLIENT - .with(|client| client.head(&url).timeout(Duration::from_millis(500)).send()); - if let Err(e) = response { - error!("Error fetching length: {:?}", e); - panic!(); - } - let response = response.unwrap(); - if response.status() != 200 { - error!("Response: {:?}", response); - panic!(); - } else { - let length = response - .headers() - .get("Content-Length") - .unwrap() - .to_str() - .unwrap() - .parse() - .unwrap(); - info!("Length: {}", length); - let mut lengths = lengths.lock().unwrap(); - lengths.insert(PathBuf::from(&self.url), length); - length - } - } -} - -// impl Deref for HttpFileHandle { -// type Target = [u8]; - -// fn deref(&self) -> &Self::Target { -// warn!("Dereferencing an HttpFileHandle is not performant."); - -// } -// } - -#[derive(Debug, Clone)] -pub struct HttpDirectory { - base_url: String, -} - -impl HttpDirectory { - pub fn new(base_url: &str) -> Self { - Self { - base_url: base_url.to_string(), - } - } - - pub fn format_url(&self, path: &Path) -> String { - if self.base_url.ends_with('/') { - format!("{}{}", self.base_url, path.display()) - } else { - format!("{}/{}", self.base_url, path.display()) - } - } -} - -impl Directory for HttpDirectory { - fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { - Ok(Arc::new(HttpFileHandle { - url: self.format_url(path), - })) - } - - fn delete(&self, path: &Path) -> Result<(), DeleteError> { - if path == Path::new(".tantivy-meta.lock") { - return Ok(()); - } - - Err(DeleteError::IoError { - io_error: Arc::new(std::io::Error::new( - std::io::ErrorKind::Other, - "Delete not supported", - )), - filepath: path.to_path_buf(), - }) - } - - fn exists(&self, path: &Path) -> Result { - if path == Path::new(".tantivy-meta.lock") { - return Ok(true); - } - let handle = HttpFileHandle { - url: self.format_url(path), - }; - Ok(handle.len() > 0) - } - - fn open_write(&self, path: &Path) -> Result { - if path == Path::new(".tantivy-meta.lock") { - return Ok(WritePtr::new(Box::new(VecWriter::new(path.to_path_buf())))); - } - dbg!(path); - Err(OpenWriteError::IoError { - io_error: Arc::new(std::io::Error::new( - std::io::ErrorKind::Other, - "Write not supported", - )), - filepath: path.to_path_buf(), - }) - } - - fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { - let handle = HttpFileHandle { - url: self.format_url(path), - }; - Ok(handle - .read_bytes(0..handle.len()) - .map_err(|_| OpenReadError::IoError { - io_error: Arc::new(std::io::Error::new(std::io::ErrorKind::Other, "Read error")), - filepath: path.to_path_buf(), - })? - .to_vec()) - } - - fn atomic_write(&self, _path: &Path, _data: &[u8]) -> std::io::Result<()> { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Write not supported", - )) - } - - fn sync_directory(&self) -> std::io::Result<()> { - Ok(()) - } - - fn watch( - &self, - _watch_callback: tantivy::directory::WatchCallback, - ) -> tantivy::Result { - Ok(WatchHandle::empty()) - } -} - -struct VecWriter { - path: PathBuf, - data: Cursor>, - is_flushed: bool, -} - -impl VecWriter { - fn new(path_buf: PathBuf) -> VecWriter { - VecWriter { - path: path_buf, - data: Cursor::new(Vec::new()), - is_flushed: true, - } - } -} - -impl Drop for VecWriter { - fn drop(&mut self) { - if !self.is_flushed { - warn!( - "You forgot to flush {:?} before its writer got Drop. Do not rely on drop. This \ - also occurs when the indexer crashed, so you may want to check the logs for the \ - root cause.", - self.path - ) - } - } -} - -impl Seek for VecWriter { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.data.seek(pos) - } -} - -impl Write for VecWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.is_flushed = false; - self.data.write_all(buf)?; - Ok(buf.len()) - } - - fn flush(&mut self) -> io::Result<()> { - self.is_flushed = true; - Ok(()) - } -} - -impl TerminatingWrite for VecWriter { - fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { - self.flush() - } -} diff --git a/airmail/src/directory/TANTIVY_AUTHORS b/airmail/src/directory/TANTIVY_AUTHORS new file mode 100644 index 0000000..dd5f0de --- /dev/null +++ b/airmail/src/directory/TANTIVY_AUTHORS @@ -0,0 +1,11 @@ +# This is the list of authors of tantivy for copyright purposes. +Paul Masurel +Laurentiu Nicola +Dru Sellers +Ashley Mannix +Michael J. Curry +Jason Wolfe +# As an employee of Google I am required to add Google LLC +# in the list of authors, but this project is not affiliated to Google +# in any other way. +Google LLC \ No newline at end of file diff --git a/airmail/src/directory/mod.rs b/airmail/src/directory/mod.rs new file mode 100644 index 0000000..4302e2b --- /dev/null +++ b/airmail/src/directory/mod.rs @@ -0,0 +1,237 @@ +mod query_len; +mod uffd; +mod vec_writer; + +use self::uffd::handle_uffd; +use crate::directory::{uffd::round_up_to_page, vec_writer::VecWriter}; +use log::info; +use nix::sys::mman::{mmap, MapFlags, ProtFlags}; +use std::{ + collections::HashMap, + ops::{Deref, Range}, + path::Path, + slice, + sync::{Arc, Mutex}, +}; +use tantivy::{ + directory::{ + error::{DeleteError, OpenReadError, OpenWriteError}, + WatchHandle, WritePtr, + }, + Directory, +}; +use tantivy_common::{file_slice::FileHandle, HasLen, OwnedBytes, StableDeref}; +use tokio::task::{spawn_blocking, JoinHandle}; +use userfaultfd::{FeatureFlags, UffdBuilder}; + +thread_local! { + pub(crate) static BLOCKING_HTTP_CLIENT: reqwest::blocking::Client = reqwest::blocking::Client::new(); +} + +const CHUNK_SIZE: usize = 1 * 1024 * 1024; + +#[derive(Clone)] +struct MmapArc { + slice: &'static [u8], +} + +impl Deref for MmapArc { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &[u8] { + self.slice + } +} +unsafe impl StableDeref for MmapArc {} + +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct CacheKey { + base_url: String, + path: String, + chunk: usize, +} + +#[derive(Debug, Clone)] +pub struct HttpFileHandle { + _ptr: usize, + owned_bytes: Arc, +} + +#[async_trait::async_trait] +impl FileHandle for HttpFileHandle { + fn read_bytes(&self, range: Range) -> std::io::Result { + Ok(self.owned_bytes.slice(range)) + } +} + +impl HasLen for HttpFileHandle { + fn len(&self) -> usize { + self.owned_bytes.len() + } +} + +#[derive(Debug, Clone)] +pub struct HttpDirectory { + base_url: String, + file_handle_cache: Arc, Arc)>>>, + atomic_read_cache: Arc>>>, +} + +impl HttpDirectory { + pub fn new(base_url: &str) -> Self { + Self { + base_url: base_url.to_string(), + file_handle_cache: Arc::new(Mutex::new(HashMap::new())), + atomic_read_cache: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn format_url(&self, path: &Path) -> String { + if self.base_url.ends_with('/') { + format!("{}{}", self.base_url, path.display()) + } else { + format!("{}/{}", self.base_url, path.display()) + } + } +} + +impl Directory for HttpDirectory { + fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { + let url = self.format_url(path); + { + let cache = self.file_handle_cache.lock().unwrap(); + if let Some((_, file_handle)) = cache.get(&url) { + return Ok(file_handle.clone()); + } + } + let file_len = query_len::len(&url); + let len = round_up_to_page(file_len); + + let uffd = UffdBuilder::new() + .close_on_exec(true) + .user_mode_only(true) + .require_features(FeatureFlags::MISSING_HUGETLBFS) + .create() + .unwrap(); + + let addr = unsafe { + mmap( + None, + len.try_into().unwrap(), + ProtFlags::PROT_READ | ProtFlags::PROT_WRITE, + MapFlags::MAP_PRIVATE | MapFlags::MAP_ANONYMOUS | MapFlags::MAP_NORESERVE, + None::, + 0, + ) + .expect("mmap") + }; + + let mmap_ptr = addr as usize; + + uffd.register(addr, len).unwrap(); + + let handle = { + let url = url.clone(); + spawn_blocking(move || { + handle_uffd(uffd, mmap_ptr, len, url); + }) + }; + let owned_bytes = Arc::new(OwnedBytes::new(MmapArc { + slice: unsafe { slice::from_raw_parts(mmap_ptr as *const u8, file_len) }, + })); + + let file_handle = Arc::new(HttpFileHandle { + _ptr: mmap_ptr, + owned_bytes, + }); + { + let mut cache = self.file_handle_cache.lock().unwrap(); + cache.insert(url, (handle, file_handle.clone())); + } + + Ok(file_handle) + } + + fn delete(&self, path: &Path) -> Result<(), DeleteError> { + if path == Path::new(".tantivy-meta.lock") { + return Ok(()); + } + + Err(DeleteError::IoError { + io_error: Arc::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Delete not supported", + )), + filepath: path.to_path_buf(), + }) + } + + fn exists(&self, path: &Path) -> Result { + if path == Path::new(".tantivy-meta.lock") { + return Ok(true); + } + Ok(query_len::len(&self.format_url(path)) > 0) + } + + fn open_write(&self, path: &Path) -> Result { + if path == Path::new(".tantivy-meta.lock") { + return Ok(WritePtr::new(Box::new(VecWriter::new(path.to_path_buf())))); + } + dbg!(path); + Err(OpenWriteError::IoError { + io_error: Arc::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Write not supported", + )), + filepath: path.to_path_buf(), + }) + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + let url = self.format_url(path); + if let Some(bytes) = self.atomic_read_cache.lock().unwrap().get(&url) { + return Ok(bytes.clone()); + } + + info!("Fetching {} in atomic read.", url); + let response = BLOCKING_HTTP_CLIENT.with(|client| client.get(&url).send()); + let response = if let Err(_e) = response { + return Err(OpenReadError::IoError { + io_error: Arc::new(std::io::Error::new( + std::io::ErrorKind::Other, + "Fetch failed for atomic read.", + )), + filepath: path.to_path_buf(), + }); + } else { + response.unwrap() + }; + let bytes = response.bytes().unwrap(); + + let bytes = bytes.to_vec(); + self.atomic_read_cache + .lock() + .unwrap() + .insert(url, bytes.clone()); + Ok(bytes) + } + + fn atomic_write(&self, _path: &Path, _data: &[u8]) -> std::io::Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Write not supported", + )) + } + + fn sync_directory(&self) -> std::io::Result<()> { + Ok(()) + } + + fn watch( + &self, + _watch_callback: tantivy::directory::WatchCallback, + ) -> tantivy::Result { + Ok(WatchHandle::empty()) + } +} diff --git a/airmail/src/directory/query_len.rs b/airmail/src/directory/query_len.rs new file mode 100644 index 0000000..f8b3f96 --- /dev/null +++ b/airmail/src/directory/query_len.rs @@ -0,0 +1,48 @@ +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Mutex, OnceLock}, + time::Duration, +}; + +use log::{error, info}; + +use crate::directory::BLOCKING_HTTP_CLIENT; + +static LENGTHS: OnceLock>> = OnceLock::new(); + +pub(crate) fn len(url: &str) -> usize { + let lengths = LENGTHS.get_or_init(|| Mutex::new(HashMap::new())); + { + let lengths = lengths.lock().unwrap(); + if let Some(length) = lengths.get(&PathBuf::from(url)) { + return *length; + } + } + + info!("Fetching length from: {}", url); + let response = BLOCKING_HTTP_CLIENT + .with(|client| client.head(url).timeout(Duration::from_millis(500)).send()); + if let Err(e) = response { + error!("Error fetching length: {:?}", e); + panic!(); + } + let response = response.unwrap(); + if response.status() != 200 { + error!("Response: {:?}", response); + panic!(); + } else { + let length = response + .headers() + .get("Content-Length") + .unwrap() + .to_str() + .unwrap() + .parse() + .unwrap(); + info!("Length: {}", length); + let mut lengths = lengths.lock().unwrap(); + lengths.insert(PathBuf::from(url), length); + length + } +} diff --git a/airmail/src/directory/uffd.rs b/airmail/src/directory/uffd.rs new file mode 100644 index 0000000..b37ceb7 --- /dev/null +++ b/airmail/src/directory/uffd.rs @@ -0,0 +1,235 @@ +use std::{collections::HashSet, num::NonZeroUsize, os::raw::c_void, sync::Arc, time::Duration}; + +use log::{debug, error, info, warn}; +use lru::LruCache; +use nix::sys::mman::{madvise, MmapAdvise}; +use tokio::{ + spawn, + sync::{ + broadcast::{Receiver, Sender}, + Mutex, + }, +}; +use userfaultfd::{Event, Uffd}; + +use crate::directory::CHUNK_SIZE; + +thread_local! { + pub(crate) static HTTP_CLIENT: reqwest::Client = reqwest::Client::new(); +} + +pub(crate) fn round_up_to_page(size: usize) -> usize { + (size + CHUNK_SIZE - 1) & !(CHUNK_SIZE - 1) +} + +async fn fetch_and_resume( + mmap_base_ptr: usize, + dst_ptr: usize, + chunk_idx: usize, + artifact_url: String, + uffd: Arc, + sender: Sender, + recent_chunks: Arc>>>, +) { + info!("Fetching chunk: {} from {}", chunk_idx, artifact_url); + let start_time = std::time::Instant::now(); + let byte_range = (chunk_idx * CHUNK_SIZE)..((chunk_idx + 1) * CHUNK_SIZE); + for attempt in 0..5 { + let response = HTTP_CLIENT + .with(|client| { + client + .get(&artifact_url) + .header( + "Range", + format!("bytes={}-{}", byte_range.start, byte_range.end - 1), + ) + .timeout(Duration::from_millis(3000)) + .send() + }) + .await; + if let Ok(response) = response { + if response.status().is_success() { + debug!( + "Success! Fetched chunk: {}-{} in {:?} and {} attempts", + byte_range.start, + byte_range.end, + start_time.elapsed(), + attempt + 1 + ); + let bytes = if let Ok(bytes) = response.bytes().await { + bytes.to_vec() + } else { + warn!("Failed to read response bytes"); + continue; + }; + let expected_len = byte_range.end - byte_range.start; + if bytes.len() > expected_len { + // This is weird and indicates a bug or malicious server. + info!( + "Expected {} bytes, got {}. Refusing to overflow chunk buffer.", + expected_len, + bytes.len() + ); + continue; + } + let bytes = if bytes.len() < expected_len { + // We need to extend the buffer to the expected size. + let mut extended = vec![0; expected_len]; + extended[..bytes.len()].copy_from_slice(&bytes); + extended + } else { + bytes + }; + debug_assert!(bytes.len() == expected_len); + debug_assert!(bytes.len() == CHUNK_SIZE); + + let offset = (dst_ptr - mmap_base_ptr) % CHUNK_SIZE; + debug_assert!(offset + 4096 <= bytes.len()); + unsafe { + let _ = uffd.copy( + bytes.as_ptr().add(offset) as *const c_void, + dst_ptr as *mut c_void, + 4096, + true, + ); + dont_need(dst_ptr as usize); + } + { + let mut recent_chunks = recent_chunks.lock().await; + recent_chunks.put(chunk_idx, bytes); + } + sender.send(chunk_idx).unwrap(); + return; + } + warn!( + "Failed to fetch chunk: {}-{}", + byte_range.start, byte_range.end + ); + } else { + warn!( + "Failed to fetch chunk: {}-{}: {:?}", + byte_range.start, byte_range.end, response + ); + } + } + error!( + "Critical: Failed to fetch chunk: {} after 5 attempts", + chunk_idx, + ); + // Find something better to do here maybe? + panic!(); +} + +fn dont_need(page_start: usize) { + // Round down to page size. + unsafe { + madvise(page_start as *mut c_void, 4096, MmapAdvise::MADV_WILLNEED) + .expect("madvise failed"); + } +} + +pub(crate) fn handle_uffd(uffd: Uffd, mmap_start: usize, _len: usize, artifact_url: String) { + info!("Starting UFFD handler"); + let uffd = Arc::new(uffd); + let requested_pages = Arc::new(Mutex::new(HashSet::new())); + let chunk_cache: Arc>>> = + Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(64).unwrap()))); + let (sender, mut receiver): (Sender, Receiver) = + tokio::sync::broadcast::channel(100); + loop { + { + if let Ok(chunk) = receiver.try_recv() { + requested_pages.blocking_lock().remove(&chunk); + } + } + let event = uffd.read_event().unwrap(); + let event = if let Some(event) = event { + event + } else { + continue; + }; + + match event { + Event::Pagefault { + kind, + rw, + addr, + thread_id, + } => { + debug!("Pagefault: {:?} {:?} {:?} {:?}", kind, rw, addr, thread_id); + let offset = addr as usize - mmap_start; + let chunk_idx = offset / CHUNK_SIZE; + if let Some(chunk) = chunk_cache.blocking_lock().get(&chunk_idx) { + debug!("Using cached chunk: {}", chunk_idx); + let offset_into_chunk = offset % CHUNK_SIZE; + unsafe { + let _ = uffd.copy( + chunk.as_ptr().add(offset_into_chunk) as *const c_void, + addr as *mut c_void, + 4096, + true, + ); + dont_need(addr as usize); + } + continue; + } + + if requested_pages.blocking_lock().contains(&chunk_idx) { + debug!("Already requested chunk: {}", chunk_idx); + let uffd = uffd.clone(); + let requested_pages = requested_pages.clone(); + let mut receiver = receiver.resubscribe(); + let addr = addr as usize; + spawn(async move { + let start = std::time::Instant::now(); + loop { + if let Ok(chunk) = receiver.recv().await { + if chunk == chunk_idx { + break; + } + } + if start.elapsed() > Duration::from_secs(10) { + error!("Timeout waiting for chunk: {}", chunk_idx); + break; + } + if !requested_pages.lock().await.contains(&chunk_idx) { + warn!("Chunk: {} is no longer requested, but we missed the message that it was found.", chunk_idx); + break; + } + } + + // Wake the process, and we'll handle the page fault again if need be. + uffd.wake(addr as *mut c_void, 4096).unwrap(); + }); + continue; + } + debug!("Requesting chunk: {}", chunk_idx); + requested_pages.blocking_lock().insert(chunk_idx); + let artifact_url = artifact_url.clone(); + let uffd = uffd.clone(); + spawn(fetch_and_resume( + mmap_start, + addr as usize, + chunk_idx, + artifact_url, + uffd, + sender.clone(), + chunk_cache.clone(), + )); + } + Event::Fork { uffd } => { + info!("Fork: {:?}", uffd); + } + Event::Remap { from, to, len } => { + info!("Remap: {:?} - {:?}, len {:?}", from, to, len); + } + Event::Remove { start, end } => { + info!("Remove: {:?} - {:?}", start, end); + } + Event::Unmap { start, end } => { + info!("Unmap: {:?} - {:?}, stopping UFFD handler", start, end); + return; + } + } + } +} diff --git a/airmail/src/directory/vec_writer.rs b/airmail/src/directory/vec_writer.rs new file mode 100644 index 0000000..0c7d7e8 --- /dev/null +++ b/airmail/src/directory/vec_writer.rs @@ -0,0 +1,84 @@ +// This code is taken directly from tantivy. Copyright notice preserved as is, +// but the authors file has been renamed to TANTIVY_AUTHORS. + +// Copyright (c) 2018 by the project authors, as listed in the AUTHORS file. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +use std::{ + io::{self, Cursor, Seek, SeekFrom, Write}, + path::PathBuf, +}; + +use log::warn; +use tantivy_common::{AntiCallToken, TerminatingWrite}; + +pub(crate) struct VecWriter { + path: PathBuf, + data: Cursor>, + is_flushed: bool, +} + +impl VecWriter { + pub(crate) fn new(path_buf: PathBuf) -> VecWriter { + VecWriter { + path: path_buf, + data: Cursor::new(Vec::new()), + is_flushed: true, + } + } +} + +impl Drop for VecWriter { + fn drop(&mut self) { + if !self.is_flushed { + warn!( + "You forgot to flush {:?} before its writer got Drop. Do not rely on drop. This \ + also occurs when the indexer crashed, so you may want to check the logs for the \ + root cause.", + self.path + ) + } + } +} + +impl Seek for VecWriter { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.data.seek(pos) + } +} + +impl Write for VecWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.is_flushed = false; + self.data.write_all(buf)?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + self.is_flushed = true; + Ok(()) + } +} + +impl TerminatingWrite for VecWriter { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + self.flush() + } +} diff --git a/airmail/src/index.rs b/airmail/src/index.rs index 3e42808..e043e02 100644 --- a/airmail/src/index.rs +++ b/airmail/src/index.rs @@ -1,13 +1,19 @@ +use std::sync::Arc; + use airmail_parser::{component::QueryComponentType, query::QueryScenario}; use serde_json::Value; use tantivy::{ collector::TopDocs, directory::MmapDirectory, - query::{BooleanQuery, FuzzyTermQuery, Query}, - schema::{FacetOptions, Schema, TextFieldIndexing, TextOptions, INDEXED, STORED, TEXT}, + query::{BooleanQuery, Query, TermQuery}, + schema::{ + FacetOptions, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, INDEXED, STORED, + TEXT, + }, tokenizer::{LowerCaser, RawTokenizer, TextAnalyzer}, Term, }; +use tokio::task::spawn_blocking; use crate::{directory::HttpDirectory, poi::AirmailPoi}; @@ -24,8 +30,9 @@ pub const FIELD_COUNTRY: &str = "country"; pub const FIELD_S2CELL: &str = "s2cell"; pub const FIELD_TAGS: &str = "tags"; +#[derive(Clone)] pub struct AirmailIndex { - tantivy_index: tantivy::Index, + tantivy_index: Arc, } fn query_for_terms( @@ -36,21 +43,16 @@ fn query_for_terms( let mut queries: Vec> = Vec::new(); let mut phrase = Vec::new(); for (i, term) in terms.iter().enumerate() { - if term.len() < 2 { - continue; - } phrase.push(Term::from_field_text(field, term)); if i == terms.len() - 1 && is_prefix { - queries.push(Box::new(FuzzyTermQuery::new_prefix( + queries.push(Box::new(TermQuery::new( Term::from_field_text(field, term), - 0, - true, + IndexRecordOption::Basic, ))); } else { - queries.push(Box::new(FuzzyTermQuery::new( + queries.push(Box::new(TermQuery::new( Term::from_field_text(field, term), - 0, - true, + IndexRecordOption::Basic, ))); }; } @@ -138,7 +140,9 @@ impl AirmailIndex { tantivy_index .tokenizers() .register("street_tokenizer", street_tokenizer); - Ok(Self { tantivy_index }) + Ok(Self { + tantivy_index: Arc::new(tantivy_index), + }) } pub fn new(index_dir: &str) -> Result> { @@ -149,7 +153,9 @@ impl AirmailIndex { tantivy_index .tokenizers() .register("street_tokenizer", street_tokenizer); - Ok(Self { tantivy_index }) + Ok(Self { + tantivy_index: Arc::new(tantivy_index), + }) } pub fn new_remote(base_url: &str) -> Result> { @@ -160,7 +166,9 @@ impl AirmailIndex { tantivy_index .tokenizers() .register("street_tokenizer", street_tokenizer); - Ok(Self { tantivy_index }) + Ok(Self { + tantivy_index: Arc::new(tantivy_index), + }) } pub fn writer(&mut self) -> Result> { @@ -178,7 +186,19 @@ impl AirmailIndex { Ok(()) } - pub fn search( + pub async fn num_docs(&self) -> Result> { + let index = self.tantivy_index.clone(); + let count = spawn_blocking(move || { + if let Ok(tantivy_reader) = index.reader() { + Some(tantivy_reader.searcher().num_docs()) + } else { + None + } + }); + Ok(count.await?.ok_or("Error getting count")?) + } + + pub async fn search( &self, query: &QueryScenario, ) -> Result, Box> { @@ -211,19 +231,10 @@ impl AirmailIndex { } QueryComponentType::RoadComponent => { - if is_prefix { - queries.push(Box::new(FuzzyTermQuery::new_prefix( - Term::from_field_text(self.field_road(), component.text()), - 0, - true, - ))); - } else { - queries.push(Box::new(FuzzyTermQuery::new( - Term::from_field_text(self.field_road(), component.text()), - 0, - true, - ))); - } + queries.push(Box::new(TermQuery::new( + Term::from_field_text(self.field_road(), component.text()), + IndexRecordOption::Basic, + ))); } QueryComponentType::IntersectionComponent => { @@ -270,12 +281,13 @@ impl AirmailIndex { } } } - let query = BooleanQuery::intersection(queries); - let top_docs = searcher.search(&query, &TopDocs::with_limit(10))?; + let (top_docs, searcher) = + spawn_blocking(move || (searcher.search(&query, &TopDocs::with_limit(10)), searcher)) + .await?; let mut results = Vec::new(); - for (score, doc_address) in top_docs { - let doc = searcher.doc(doc_address)?; + for (score, doc_id) in top_docs? { + let doc = searcher.doc(doc_id)?; let house_num: Option<&str> = doc .get_first(self.field_house_number()) .map(|v| v.as_text()) diff --git a/airmail_index/Cargo.toml b/airmail_index/Cargo.toml index 03f8331..1165c92 100644 --- a/airmail_index/Cargo.toml +++ b/airmail_index/Cargo.toml @@ -35,3 +35,6 @@ lru = "0.12.2" [[bin]] name = "query" + +[[bin]] +name = "merge" diff --git a/airmail_index/permute_dicts/en/street_prefixes.txt b/airmail_index/permute_dicts/en/street_prefixes.txt new file mode 100644 index 0000000..5e60cbe --- /dev/null +++ b/airmail_index/permute_dicts/en/street_prefixes.txt @@ -0,0 +1,8 @@ +north|n| +south|s| +east|e| +west|w| +northwest|nw| +northeast|ne| +southwest|sw| +southeast|se| \ No newline at end of file diff --git a/airmail_index/src/bin/merge.rs b/airmail_index/src/bin/merge.rs new file mode 100644 index 0000000..2f9d2fe --- /dev/null +++ b/airmail_index/src/bin/merge.rs @@ -0,0 +1,23 @@ +use airmail::index::AirmailIndex; +use clap::Parser; + +#[derive(Debug, Parser)] +struct Args { + #[clap(long, short)] + index: String, + #[clap(long, short)] + merge: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + println!("{:?}", args); + let mut index = AirmailIndex::new(&args.index)?; + if args.merge { + index.merge().await?; + } else { + println!("Pass the --merge arg if you're sure"); + } + Ok(()) +} diff --git a/airmail_index/src/bin/query.rs b/airmail_index/src/bin/query.rs index e9d2d4d..8706760 100644 --- a/airmail_index/src/bin/query.rs +++ b/airmail_index/src/bin/query.rs @@ -1,6 +1,8 @@ -use airmail::index::AirmailIndex; +use airmail::{index::AirmailIndex, poi::AirmailPoi}; use clap::Parser; +use futures_util::future::join_all; use rustyline::DefaultEditor; +use tokio::spawn; #[derive(Debug, Parser)] struct Args { @@ -22,29 +24,29 @@ async fn main() -> Result<(), Box> { let parsed = airmail_parser::query::Query::parse(&query); let scenarios = parsed.scenarios(); - let results: Option> = scenarios - .iter() - .take(10) - .filter_map(|scenario| { - let results = index.search(scenario).unwrap(); - if results.is_empty() { - None - } else { - dbg!(scenario); - Some(results) - } - }) - .next(); + let mut scaled_results: Vec>> = Vec::new(); + for scenario in scenarios.into_iter().take(3) { + let index = index.clone(); + scaled_results.push(spawn(async move { + let docs = index.search(&scenario).await.unwrap(); + let docs = docs + .into_iter() + .map(|(poi, score)| (poi, scenario.penalty_mult() * score)) + .collect::>(); + docs + })); + } + let mut results: Vec<(AirmailPoi, f32)> = join_all(scaled_results) + .await + .into_iter() + .flatten() + .flatten() + .collect::>(); - println!(); - if let Some(results) = results { - for result in &results { - println!(" - {:?}", result); - } - println!("{} results found in {:?}", results.len(), start.elapsed()); - } else { - println!("No results found in {:?}.", start.elapsed()); + results.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap()); + for (poi, score) in results.iter().take(10) { + println!("{:?} {}", poi, score); } - println!(); + println!("{} results found in {:?}", results.len(), start.elapsed()); } } diff --git a/airmail_index/src/main.rs b/airmail_index/src/main.rs index 9108ece..076648d 100644 --- a/airmail_index/src/main.rs +++ b/airmail_index/src/main.rs @@ -458,10 +458,5 @@ async fn main() -> Result<(), Box> { indexing_join_handle.await.unwrap(); } - println!("Done. Merging segments."); - let index_path = args.index.clone(); - let mut index = airmail::index::AirmailIndex::new(&index_path)?; - index.merge().await?; - Ok(()) } diff --git a/airmail_index/src/openstreetmap.rs b/airmail_index/src/openstreetmap.rs index 9c1977f..608d2e3 100644 --- a/airmail_index/src/openstreetmap.rs +++ b/airmail_index/src/openstreetmap.rs @@ -110,7 +110,7 @@ fn index_way(tags: &HashMap, way: &Way) -> Option { tags_to_poi(&tags, lat, lng) } -fn relation_centroid( +fn _relation_centroid( relation: &Relation, level: u32, turbosm: &Turbosm, @@ -132,7 +132,7 @@ fn relation_centroid( } RelationMember::Relation(_, other_relation) => { let other_relation = turbosm.relation(*other_relation)?; - if let Ok(centroid) = relation_centroid(&other_relation, level + 1, turbosm) { + if let Ok(centroid) = _relation_centroid(&other_relation, level + 1, turbosm) { points.push(Point::new(centroid.0, centroid.1)); } else { debug!("Skipping relation with no centroid: {:?}", relation.id()); @@ -167,17 +167,17 @@ pub fn parse_osm Result<(), Box Result, Box> { let road = sanitize(road); let mut permutations = vec![road.clone()]; // This may be a bad way of handling it, I don't know enough about non-ascii whitespace to be sure. - let road_components: Vec> = road + let suffix_components: Vec> = road .split_whitespace() .map(|s| STREET_SUFFIXES_SUBS.substitute(s)) .collect(); + let prefix_components: Vec> = road + .split_whitespace() + .map(|s| STREET_PREFIXES_SUBS.substitute(s)) + .collect(); + debug_assert!(suffix_components.len() == prefix_components.len()); + let components_len = suffix_components.len(); let mut found_suffix = false; - for i in 0..=road_components.len() { - let base_substrings = permute("", &road_components[0..i]); - let suffix_substrings = permute("", &road_components[i..]); + for i in 0..=components_len { + let base_suffix_substrings = permute("", &suffix_components[0..i]); + let suffix_substrings = permute("", &suffix_components[i..]); if !found_suffix { - for substring_pair in base_substrings.iter().zip(suffix_substrings.iter()) { + for substring_pair in base_suffix_substrings.iter().zip(suffix_substrings.iter()) { let suffix_substring = substring_pair.1.clone(); if search_fst(street_suffixes_fst(), suffix_substring.clone(), 0, false) { found_suffix = true; @@ -95,8 +104,11 @@ pub(super) fn permute_road(road: &str) -> Result, Box> { } if found_suffix { - permutations.extend(base_substrings.iter().cloned()); + permutations.extend(base_suffix_substrings); } + // If we found a way to permute the prefix, we should include it in the permutations. + let prefix_substrings = permute("", &prefix_components[0..i]); + permutations.extend(prefix_substrings); } Ok(permutations) } diff --git a/airmail_service/Cargo.toml b/airmail_service/Cargo.toml index 4ecbbf4..18dce56 100644 --- a/airmail_service/Cargo.toml +++ b/airmail_service/Cargo.toml @@ -16,3 +16,4 @@ log = "0.4.20" clap = { version = "4.4.18", features = ["derive"] } serde = { version = "1", features = ["derive"] } serde_json = "1" +futures-util = "0.3.30" diff --git a/airmail_service/src/main.rs b/airmail_service/src/main.rs index 2f3af74..33f02f3 100644 --- a/airmail_service/src/main.rs +++ b/airmail_service/src/main.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, error::Error, sync::Arc}; use airmail::{index::AirmailIndex, poi::AirmailPoi}; -use airmail_parser::query::QueryScenario; use axum::{ extract::{Query, State}, routing::get, @@ -9,10 +8,10 @@ use axum::{ }; use clap::Parser; use deunicode::deunicode; -use log::trace; +use futures_util::future::join_all; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::task::spawn_blocking; +use tokio::{spawn, task::spawn_blocking}; #[derive(Debug, Parser)] struct Args { @@ -31,56 +30,41 @@ async fn search( State(index): State>, ) -> Json { let query = params.get("q").unwrap(); - trace!("searching for {:?}", query); let query = deunicode(query.trim()).to_lowercase(); + + let start = std::time::Instant::now(); let parsed = airmail_parser::query::Query::parse(&query); let scenarios = parsed.scenarios(); - let start = std::time::Instant::now(); - let mut all_results: Vec<(AirmailPoi, f32, QueryScenario)> = vec![]; - for scenario in scenarios.iter().take(3) { - if all_results.len() > 20 { - break; - } - let results = { - let scenario = scenario.clone(); - let index = index.clone(); - spawn_blocking(move || index.search(&scenario).unwrap()) - .await - .unwrap() - }; - if results.is_empty() { - continue; - } else { - all_results.extend( - results - .iter() - .map(|(poi, score)| { - ( - poi.clone(), - *score * scenario.penalty_mult(), - scenario.clone(), - ) - }) - .collect::>(), - ); - } + let mut scaled_results: Vec>> = Vec::new(); + for scenario in scenarios.into_iter().take(3) { + let index = index.clone(); + scaled_results.push(spawn(async move { + let docs = index.search(&scenario).await.unwrap(); + let docs = docs + .into_iter() + .map(|(poi, score)| (poi, scenario.penalty_mult() * score)) + .collect::>(); + docs + })); } + let mut results: Vec<(AirmailPoi, f32)> = join_all(scaled_results) + .await + .into_iter() + .flatten() + .flatten() + .collect::>(); - all_results.sort_by(|(_, a, _), (_, b, _)| b.partial_cmp(a).unwrap()); + results.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap()); - println!( - "{} results found in {:?}", - all_results.len(), - start.elapsed() - ); + println!("{} results found in {:?}", results.len(), start.elapsed()); let mut response = Response { metadata: HashMap::new(), - features: all_results + features: results .clone() - .iter() - .map(|(results, _, _)| results.clone()) + .into_iter() + .map(|(results, _)| results.clone()) .collect::>(), }; @@ -106,6 +90,7 @@ async fn main() -> Result<(), Box> { }) .await .unwrap(); + println!("Have {} docs", index.num_docs().await?); let app = Router::new().route("/search", get(search).with_state(index)); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); axum::serve(listener, app).await.unwrap(); diff --git a/airmail_site/src/components/Features.vue b/airmail_site/src/components/Features.vue index 7b19f2c..1039492 100644 --- a/airmail_site/src/components/Features.vue +++ b/airmail_site/src/components/Features.vue @@ -16,20 +16,6 @@ const items = [ icon: "mdi-airplane", color: "background-color: #ffc93c", }, - { - id: 3, - name: "Autocomplete", - des: "Supports prefix searching and autocomplete.", - icon: "mdi-redo", - color: "background-color: #f73859", - }, - { - id: 4, - name: "Batteries-included", - des: "Easy-ish setup, whether you're bringing your own data or using an existing OpenStreetMap index.", - icon: "mdi-battery", - color: "background-color: #7f26bf", - }, ]; const total = 4;