Skip to content

Commit

Permalink
Remove unwraps
Browse files Browse the repository at this point in the history
  • Loading branch information
tristan-morris committed Jul 29, 2024
1 parent 2b62435 commit 9ec7c9e
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 73 deletions.
1 change: 0 additions & 1 deletion airmail/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ anyhow = "1.0.86"
thiserror = "1.0.63"

[features]
invasive_logging = []
remote_index = ["tantivy/quickwit"]
131 changes: 63 additions & 68 deletions airmail/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::path::Path;
use std::sync::Arc;

use anyhow::Result;
use futures_util::future::join_all;
use geo::Rect;
use itertools::Itertools;
use log::debug;
use log::{trace, warn};
use s2::region::RegionCoverer;
use std::collections::BTreeMap;
use tantivy::schema::Value;
Expand Down Expand Up @@ -346,70 +345,66 @@ impl AirmailIndex {
let query_string = query.trim().replace("'s", "s");

let start = std::time::Instant::now();
let (top_docs, searcher) = {
let query = self
.construct_query(
&searcher,
&query_string,
tags,
bbox,
boost_regions,
request_leniency,
)
.await;

#[cfg(feature = "invasive_logging")]
{
dbg!(&query);

let query = self
.construct_query(
&searcher,
&query_string,
tags,
bbox,
boost_regions,
request_leniency,
)
.await;

trace!("Search query: {:?}", &query);

// Perform the search and then resolve the returned documents
let top_docs: Result<Vec<(f32, TantivyDocument)>> = spawn_blocking(move || {
let doc_addresses = searcher.search(&query, &TopDocs::with_limit(10))?;
let mut docs = vec![];
for (score, doc_address) in doc_addresses {
if let Ok(doc) = searcher.doc::<TantivyDocument>(doc_address) {
docs.push((score, doc));
}
}

let (top_docs, searcher) = spawn_blocking(move || {
(searcher.search(&query, &TopDocs::with_limit(10)), searcher)
})
.await?;
let top_docs = top_docs?;
debug!(
"Search took {:?} and yielded {} results",
start.elapsed(),
top_docs.len()
);
(top_docs, searcher)
};
Ok(docs)
})
.await?;

let mut scores = Vec::new();
let mut futures = Vec::new();
for (score, doc_id) in top_docs {
let searcher = searcher.clone();
let doc = spawn_blocking(move || searcher.doc::<TantivyDocument>(doc_id));
scores.push(score);
futures.push(doc);
}
let mut results = Vec::new();
let top_docs = join_all(futures).await;
for (score, doc_future) in scores.iter().zip(top_docs) {
let doc = doc_future??;
let source = doc
.get_first(self.field_source())
.map(|value| value.as_str().unwrap().to_string())
.unwrap_or_default();
let s2cell = doc
.get_first(self.field_s2cell())
.unwrap()
.as_u64()
.unwrap();
let cellid = s2::cellid::CellID(s2cell);
let latlng = s2::latlng::LatLng::from(cellid);
let tags: Vec<(String, String)> = doc
.get_first(self.field_tags())
.unwrap()
.as_object()
.unwrap()
.map(|(k, v)| (k.to_string(), v.as_str().unwrap().to_string()))
.collect();

let poi = AirmailPoi::new(source, latlng.lat.deg(), latlng.lng.deg(), tags)?;
results.push((poi, *score));
}
let top_docs = top_docs.map_err(|e| {
warn!("Search failed: {:?}", e);
e
})?;

trace!(
"Search took {:?} and yielded {} results",
start.elapsed(),
top_docs.len()
);

let results = top_docs
.into_iter()
.flat_map(|(score, doc)| {
let source = doc
.get_first(self.field_source())
.map(|value| value.as_str().unwrap_or_default().to_string())
.unwrap_or_default();
let s2cell = doc.get_first(self.field_s2cell())?.as_u64()?;
let cellid = s2::cellid::CellID(s2cell);
let latlng = s2::latlng::LatLng::from(cellid);
let tags: Vec<(String, String)> = doc
.get_first(self.field_tags())?
.as_object()?
.map(|(k, v)| (k.to_string(), v.as_str().unwrap_or_default().to_string()))
.collect();

AirmailPoi::new(source, latlng.lat.deg(), latlng.lng.deg(), tags)
.ok()
.map(|poi| (poi, score))
})
.collect::<Vec<_>>();

Ok(results)
}
Expand All @@ -430,7 +425,7 @@ impl AirmailIndexWriter {
for content in poi.content {
self.process_field(&mut doc, &content);
}
doc.add_text(self.schema.get_field(FIELD_SOURCE).unwrap(), source);
doc.add_text(self.schema.get_field(FIELD_SOURCE)?, source);

let indexed_keys = [
"natural", "amenity", "shop", "leisure", "tourism", "historic", "cuisine",
Expand All @@ -443,22 +438,22 @@ impl AirmailIndexWriter {
.any(|prefix| key.starts_with(prefix))
{
doc.add_text(
self.schema.get_field(FIELD_INDEXED_TAG).unwrap(),
self.schema.get_field(FIELD_INDEXED_TAG)?,
format!("{}={}", key, value).as_str(),
);
}
}
doc.add_object(
self.schema.get_field(FIELD_TAGS).unwrap(),
self.schema.get_field(FIELD_TAGS)?,
poi.tags
.iter()
.map(|(k, v)| (k.to_string(), OwnedValue::Str(v.to_string())))
.collect::<BTreeMap<String, OwnedValue>>(),
);

doc.add_u64(self.schema.get_field(FIELD_S2CELL).unwrap(), poi.s2cell);
doc.add_u64(self.schema.get_field(FIELD_S2CELL)?, poi.s2cell);
for parent in poi.s2cell_parents {
doc.add_u64(self.schema.get_field(FIELD_S2CELL_PARENTS).unwrap(), parent);
doc.add_u64(self.schema.get_field(FIELD_S2CELL_PARENTS)?, parent);
}
self.tantivy_writer.add_document(doc)?;

Expand Down
3 changes: 3 additions & 0 deletions airmail/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![forbid(unsafe_code)]
#![warn(clippy::missing_panics_doc)]

#[macro_use]
extern crate lazy_static;

Expand Down
1 change: 0 additions & 1 deletion airmail_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ anyhow = "1.0.86"
thiserror = "1.0.63"

[features]
invasive_logging = ["airmail/invasive_logging"]
remote_index = ["airmail/remote_index"]
default = ["remote_index"]
8 changes: 7 additions & 1 deletion airmail_service/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use axum::{
};
use deunicode::deunicode;
use geo::{Coord, Rect};
use log::debug;
use serde::{Deserialize, Serialize};

use crate::error::AirmailServiceError;
Expand Down Expand Up @@ -73,7 +74,12 @@ pub async fn search(

let results = index.search(&query, leniency, tags, bbox, &[]).await?;

println!("{} results found in {:?}", results.len(), start.elapsed());
debug!(
"Query: {:?} produced: {} results found in {:?}",
params,
results.len(),
start.elapsed()
);

let response = Response {
metadata: MetadataResponse { query: params },
Expand Down
13 changes: 11 additions & 2 deletions airmail_service/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![forbid(unsafe_code)]
#![warn(clippy::pedantic)]

use std::future::IntoFuture;
use std::sync::Arc;

use airmail::index::AirmailIndex;
Expand All @@ -9,8 +10,9 @@ use api::search;
use axum::{http::HeaderValue, routing::get, Router};
use clap::Parser;
use env_logger::Env;
use log::{debug, info};
use log::{debug, info, warn};
use tokio::net::TcpListener;
use tokio::select;
use tower_http::cors::CorsLayer;

mod api;
Expand Down Expand Up @@ -60,7 +62,14 @@ async fn main() -> Result<()> {

info!("Listening at: {}/search?q=query", args.bind);
let listener = TcpListener::bind(args.bind).await?;
axum::serve(listener, app).await?;
let server = axum::serve(listener, app.into_make_service()).into_future();

select! {
_ = server => {}
_ = tokio::signal::ctrl_c() => {
warn!("Received ctrl-c, shutting down");
}
}

Ok(())
}

0 comments on commit 9ec7c9e

Please sign in to comment.