Skip to content

Commit

Permalink
Merge pull request #4 from disasterscience/structure
Browse files Browse the repository at this point in the history
Structure
  • Loading branch information
tristan-morris authored Jul 29, 2024
2 parents 1c32229 + 9ec7c9e commit c3330a2
Show file tree
Hide file tree
Showing 17 changed files with 808 additions and 314 deletions.
176 changes: 121 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion airmail/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ anyhow = "1.0.86"
thiserror = "1.0.63"

[features]
invasive_logging = []
remote_index = ["tantivy/quickwit"]
135 changes: 65 additions & 70 deletions airmail/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::path::PathBuf;
use std::path::Path;
use std::sync::Arc;

use anyhow::Result;
use futures_util::future::join_all;
use geo::Rect;
use itertools::Itertools;
use log::debug;
use log::{trace, warn};
use s2::region::RegionCoverer;
use std::collections::BTreeMap;
use tantivy::schema::Value;
Expand Down Expand Up @@ -112,7 +111,7 @@ impl AirmailIndex {
self.tantivy_index.schema().get_field(FIELD_TAGS).unwrap()
}

pub fn create(index_dir: &PathBuf) -> Result<Self> {
pub fn create(index_dir: &Path) -> Result<Self> {
let schema = Self::schema();
let tantivy_index =
tantivy::Index::open_or_create(MmapDirectory::open(index_dir)?, schema)?;
Expand Down Expand Up @@ -346,70 +345,66 @@ impl AirmailIndex {
let query_string = query.trim().replace("'s", "s");

let start = std::time::Instant::now();
let (top_docs, searcher) = {
let query = self
.construct_query(
&searcher,
&query_string,
tags,
bbox,
boost_regions,
request_leniency,
)
.await;

#[cfg(feature = "invasive_logging")]
{
dbg!(&query);

let query = self
.construct_query(
&searcher,
&query_string,
tags,
bbox,
boost_regions,
request_leniency,
)
.await;

trace!("Search query: {:?}", &query);

// Perform the search and then resolve the returned documents
let top_docs: Result<Vec<(f32, TantivyDocument)>> = spawn_blocking(move || {
let doc_addresses = searcher.search(&query, &TopDocs::with_limit(10))?;
let mut docs = vec![];
for (score, doc_address) in doc_addresses {
if let Ok(doc) = searcher.doc::<TantivyDocument>(doc_address) {
docs.push((score, doc));
}
}

let (top_docs, searcher) = spawn_blocking(move || {
(searcher.search(&query, &TopDocs::with_limit(10)), searcher)
})
.await?;
let top_docs = top_docs?;
debug!(
"Search took {:?} and yielded {} results",
start.elapsed(),
top_docs.len()
);
(top_docs, searcher)
};
Ok(docs)
})
.await?;

let mut scores = Vec::new();
let mut futures = Vec::new();
for (score, doc_id) in top_docs {
let searcher = searcher.clone();
let doc = spawn_blocking(move || searcher.doc::<TantivyDocument>(doc_id));
scores.push(score);
futures.push(doc);
}
let mut results = Vec::new();
let top_docs = join_all(futures).await;
for (score, doc_future) in scores.iter().zip(top_docs) {
let doc = doc_future??;
let source = doc
.get_first(self.field_source())
.map(|value| value.as_str().unwrap().to_string())
.unwrap_or_default();
let s2cell = doc
.get_first(self.field_s2cell())
.unwrap()
.as_u64()
.unwrap();
let cellid = s2::cellid::CellID(s2cell);
let latlng = s2::latlng::LatLng::from(cellid);
let tags: Vec<(String, String)> = doc
.get_first(self.field_tags())
.unwrap()
.as_object()
.unwrap()
.map(|(k, v)| (k.to_string(), v.as_str().unwrap().to_string()))
.collect();

let poi = AirmailPoi::new(source, latlng.lat.deg(), latlng.lng.deg(), tags)?;
results.push((poi, *score));
}
let top_docs = top_docs.map_err(|e| {
warn!("Search failed: {:?}", e);
e
})?;

trace!(
"Search took {:?} and yielded {} results",
start.elapsed(),
top_docs.len()
);

let results = top_docs
.into_iter()
.flat_map(|(score, doc)| {
let source = doc
.get_first(self.field_source())
.map(|value| value.as_str().unwrap_or_default().to_string())
.unwrap_or_default();
let s2cell = doc.get_first(self.field_s2cell())?.as_u64()?;
let cellid = s2::cellid::CellID(s2cell);
let latlng = s2::latlng::LatLng::from(cellid);
let tags: Vec<(String, String)> = doc
.get_first(self.field_tags())?
.as_object()?
.map(|(k, v)| (k.to_string(), v.as_str().unwrap_or_default().to_string()))
.collect();

AirmailPoi::new(source, latlng.lat.deg(), latlng.lng.deg(), tags)
.ok()
.map(|poi| (poi, score))
})
.collect::<Vec<_>>();

Ok(results)
}
Expand All @@ -430,7 +425,7 @@ impl AirmailIndexWriter {
for content in poi.content {
self.process_field(&mut doc, &content);
}
doc.add_text(self.schema.get_field(FIELD_SOURCE).unwrap(), source);
doc.add_text(self.schema.get_field(FIELD_SOURCE)?, source);

let indexed_keys = [
"natural", "amenity", "shop", "leisure", "tourism", "historic", "cuisine",
Expand All @@ -443,22 +438,22 @@ impl AirmailIndexWriter {
.any(|prefix| key.starts_with(prefix))
{
doc.add_text(
self.schema.get_field(FIELD_INDEXED_TAG).unwrap(),
self.schema.get_field(FIELD_INDEXED_TAG)?,
format!("{}={}", key, value).as_str(),
);
}
}
doc.add_object(
self.schema.get_field(FIELD_TAGS).unwrap(),
self.schema.get_field(FIELD_TAGS)?,
poi.tags
.iter()
.map(|(k, v)| (k.to_string(), OwnedValue::Str(v.to_string())))
.collect::<BTreeMap<String, OwnedValue>>(),
);

doc.add_u64(self.schema.get_field(FIELD_S2CELL).unwrap(), poi.s2cell);
doc.add_u64(self.schema.get_field(FIELD_S2CELL)?, poi.s2cell);
for parent in poi.s2cell_parents {
doc.add_u64(self.schema.get_field(FIELD_S2CELL_PARENTS).unwrap(), parent);
doc.add_u64(self.schema.get_field(FIELD_S2CELL_PARENTS)?, parent);
}
self.tantivy_writer.add_document(doc)?;

Expand Down
3 changes: 3 additions & 0 deletions airmail/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![forbid(unsafe_code)]
#![warn(clippy::missing_panics_doc)]

#[macro_use]
extern crate lazy_static;

Expand Down
10 changes: 7 additions & 3 deletions airmail_indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ lingua = { version = "1.6.2", default-features = false, features = [
] }
redb = "1.5.0"
anyhow = "1.0.86"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite"] }
osmx = { version = "0.1.0", optional = true }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
osmx = { version = "0.1.0" }
thiserror = "1.0.63"
rstar = { version = "0.12.0", features = ["serde"] }
geo-types = { version = "0.7.11", features = ["use-rstar_0_12"] }
bincode = { version = "1.3.3" }
geozero = { version = "0.13.0", features = ["with-geo", "with-gpkg"] }

[features]
default = ["remote_index", "dep:osmx"]
default = ["remote_index"]
remote_index = ["airmail/remote_index"]
82 changes: 59 additions & 23 deletions airmail_indexer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,44 @@ use std::{
use tokio::{spawn, task::spawn_blocking};

use crate::{
populate_admin_areas, wof::WhosOnFirst, WofCacheItem, TABLE_AREAS, TABLE_LANGS, TABLE_NAMES,
pip_tree::PipTree,
populate_admin_areas,
wof::{ConcisePipResponse, WhosOnFirst},
WofCacheItem, TABLE_AREAS, TABLE_LANGS, TABLE_NAMES,
};

pub struct ImporterBuilder {
admin_cache: Option<PathBuf>,
index: AirmailIndex,
admin_cache_path: Option<PathBuf>,
wof_db_path: PathBuf,
pip_tree_path: Option<PathBuf>,
}

impl ImporterBuilder {
pub fn new(whosonfirst_spatialite_path: &Path) -> Self {
Self {
admin_cache: None,
wof_db_path: whosonfirst_spatialite_path.to_path_buf(),
}
pub fn new(airmail_index_path: &Path, wof_db_path: &Path) -> Result<Self> {
// Create the index
let index = AirmailIndex::create(airmail_index_path)?;

Ok(Self {
index,
admin_cache_path: None,
wof_db_path: wof_db_path.to_path_buf(),
pip_tree_path: None,
})
}

pub fn admin_cache(mut self, admin_cache: &Path) -> Self {
self.admin_cache = Some(admin_cache.to_path_buf());
self.admin_cache_path = Some(admin_cache.to_path_buf());
self
}

pub fn pip_tree_cache(mut self, pip_tree_cache: &Path) -> Self {
self.pip_tree_path = Some(pip_tree_cache.to_path_buf());
self
}

pub async fn build(self) -> Result<Importer> {
let admin_cache = if let Some(admin_cache) = self.admin_cache {
let admin_cache = if let Some(admin_cache) = self.admin_cache_path {
admin_cache
} else {
std::env::temp_dir().join("admin_cache.db")
Expand All @@ -54,27 +69,40 @@ impl ImporterBuilder {

let wof_db = WhosOnFirst::new(&self.wof_db_path).await?;

Ok(Importer {
admin_cache: Arc::new(db),
wof_db,
})
let pip_tree = if let Some(pip_tree_cache) = self.pip_tree_path {
Some(PipTree::new_or_load(&wof_db, &pip_tree_cache).await?)
} else {
None
};

Importer::new(self.index, db, wof_db, pip_tree).await
}
}

pub struct Importer {
index: AirmailIndex,
admin_cache: Arc<Database>,
wof_db: WhosOnFirst,
pip_tree: Option<PipTree<ConcisePipResponse>>,
}

impl Importer {
pub async fn run_import(
&self,
mut index: AirmailIndex,
source: &str,
receiver: Receiver<ToIndexPoi>,
) -> Result<()> {
pub async fn new(
index: AirmailIndex,
admin_cache: Database,
wof_db: WhosOnFirst,
pip_tree: Option<PipTree<ConcisePipResponse>>,
) -> Result<Self> {
Ok(Self {
index,
admin_cache: Arc::new(admin_cache),
wof_db,
pip_tree,
})
}

pub async fn run_import(mut self, source: &str, receiver: Receiver<ToIndexPoi>) -> Result<()> {
let source = source.to_string();
// let mut nonblocking_join_handles = Vec::new();
let (to_cache_sender, to_cache_receiver): (Sender<WofCacheItem>, Receiver<WofCacheItem>) =
crossbeam::channel::bounded(1024);
let (to_index_sender, to_index_receiver): (Sender<SchemafiedPoi>, Receiver<SchemafiedPoi>) =
Expand Down Expand Up @@ -116,10 +144,11 @@ impl Importer {
}
}));

let mut writer = self.index.writer()?;

handles.push(spawn_blocking(move || {
let start = std::time::Instant::now();

let mut writer = index.writer().unwrap();
let mut count = 0;
loop {
{
Expand Down Expand Up @@ -152,6 +181,7 @@ impl Importer {
let to_cache_sender = to_cache_sender.clone();
let admin_cache = self.admin_cache.clone();
let wof_db = self.wof_db.clone();
let pip_tree = self.pip_tree.clone();

handles.push(spawn(async move {
let mut read = admin_cache.begin_read().unwrap();
Expand All @@ -168,8 +198,14 @@ impl Importer {
);
}

match populate_admin_areas(&read, to_cache_sender.clone(), &mut poi, &wof_db)
.await
match populate_admin_areas(
&read,
to_cache_sender.clone(),
&mut poi,
&wof_db,
&pip_tree,
)
.await
{
Ok(()) => {
let poi = SchemafiedPoi::from(poi);
Expand Down
8 changes: 6 additions & 2 deletions airmail_indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod error;
mod importer;
mod pip_tree;
mod query_pip;
mod wof;

Expand All @@ -12,9 +13,10 @@ use airmail::poi::ToIndexPoi;
use anyhow::Result;
use crossbeam::channel::Sender;
use lingua::{IsoCode639_3, Language};
use pip_tree::PipTree;
use redb::{ReadTransaction, TableDefinition};
use std::str::FromStr;
use wof::WhosOnFirst;
use wof::{ConcisePipResponse, WhosOnFirst};

pub(crate) const TABLE_AREAS: TableDefinition<u64, &[u8]> = TableDefinition::new("admin_areas");
pub(crate) const TABLE_NAMES: TableDefinition<u64, &str> = TableDefinition::new("admin_names");
Expand Down Expand Up @@ -58,8 +60,10 @@ pub(crate) async fn populate_admin_areas(
to_cache_sender: Sender<WofCacheItem>,
poi: &mut ToIndexPoi,
wof_db: &WhosOnFirst,
pip_tree: &Option<PipTree<ConcisePipResponse>>,
) -> Result<()> {
let pip_response = query_pip::query_pip(read, to_cache_sender, poi.s2cell, wof_db).await?;
let pip_response =
query_pip::query_pip(read, to_cache_sender, poi.s2cell, wof_db, pip_tree).await?;
for admin in pip_response.admin_names {
poi.admins.push(admin);
}
Expand Down
Loading

0 comments on commit c3330a2

Please sign in to comment.