Skip to content

Commit

Permalink
Check in demo site and update CI, also indexing changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ellenhp committed Feb 10, 2024
1 parent eac5771 commit 6369c70
Show file tree
Hide file tree
Showing 69 changed files with 11,298 additions and 312 deletions.
14 changes: 4 additions & 10 deletions .github/workflows/continuous_deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,14 @@ jobs:
release:
runs-on: ubuntu-latest
steps:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown

- uses: jetli/[email protected]
- uses: jetli/[email protected]

- uses: actions/checkout@v2

- run: cd parser_demo && trunk build --release
- uses: actions/setup-node@v3

- run: cd airmail_site && yarn install && yarn build

- uses: peaceiris/actions-gh-pages@v3
if: github.ref == 'refs/heads/main'
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./parser_demo/dist
publish_dir: ./airmail_site/dist
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Cargo.lock

**.geojson

data/
data/*
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ Airmail's killer feature is sub-100MB memory consumption for a serving instance.
- [x] Index OpenAddresses data.
- [ ] Index WhosOnFirst data.
- [x] API server.
- [ ] Support and test planet-scale indices.
- [ ] Extend query parser for other locales.
- [ ] Categorical search, e.g. "coffee shops near me".
- [ ] Bounding box biasing and restriction.
- [ ] Investigate feasibility of executing queries against remote indices via HTTP range requests.
- [ ] Minutely updates.
- [ ] Systematic/automatic quality testing in CI.
- [ ] Alternate results, e.g. returning Starbucks locations for "Dunkin Donuts" queries on the US west coast.[^2]

Expand Down
3 changes: 2 additions & 1 deletion airmail_index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ rand = "0.8.5"
subprocess = "0.2.9"
rustyline = "13.0.0"
turbosm = { path = "../turbosm" }
redb = "1.5.0"
num_cpus = "1.16.0"
lru = "0.12.2"

[[bin]]
name = "query"
62 changes: 34 additions & 28 deletions airmail_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,22 @@ pub async fn populate_admin_areas(poi: &mut AirmailPoi, port: usize) -> Result<(
.locality
.unwrap_or_default()
.iter()
.map(|a| a.name.to_lowercase())
.map(|a| a.to_lowercase())
.collect();
if let Some(neighbourhood) = pip_response.neighbourhood {
locality.extend(neighbourhood.iter().map(|a| a.name.to_lowercase()));
locality.extend(neighbourhood.iter().map(|a| a.to_lowercase()));
}
let region = pip_response
.region
.unwrap_or_default()
.iter()
.map(|a| a.name.to_lowercase())
.map(|a| a.to_lowercase())
.collect();
let country = pip_response
.country
.unwrap_or_default()
.iter()
.map(|a| a.name.to_lowercase())
.map(|a| a.to_lowercase())
.collect();

poi.locality = locality;
Expand All @@ -69,7 +69,7 @@ struct Args {
/// Path to the Docker socket.
#[clap(long, short)]
docker_socket: Option<String>,
/// Path to the Who's On First SQLite database.
/// Path to the Who's On First Spatialite database.
#[clap(long, short)]
wof_db: String,
/// Whether to forcefully recreate the container. Default false.
Expand All @@ -96,7 +96,7 @@ enum ContainerStatus {
DoesNotExist,
}

const PIP_SERVICE_IMAGE: &str = "docker.io/pelias/pip-service:latest";
const PIP_SERVICE_IMAGE: &str = "docker.io/pelias/spatial:latest";

async fn docker_connect() -> Result<Docker, Box<dyn std::error::Error>> {
let docker = if let Some(docker_socket) = &Args::parse().docker_socket {
Expand Down Expand Up @@ -134,10 +134,11 @@ async fn get_container_status(

async fn maybe_start_pip_container(
wof_db_path: &str,
idx: usize,
recreate: bool,
docker: &Docker,
) -> Result<(), Box<dyn std::error::Error>> {
// Holdover from when we had multiple containers.
let idx = 0;
let container_state = get_container_status(idx, docker).await?;
if container_state == ContainerStatus::Running && !recreate {
println!(
Expand All @@ -154,23 +155,26 @@ async fn maybe_start_pip_container(
env: Some(vec![]),
host_config: Some(HostConfig {
port_bindings: Some(HashMap::from([(
3102.to_string(),
3000.to_string(),
Some(vec![bollard::models::PortBinding {
host_ip: None,
host_port: Some(format!("{}", 3102 + idx)),
}]),
)])),
mounts: Some(vec![bollard::models::Mount {
source: Some(wof_db_path.to_string()),
target: Some(
"/mnt/pelias/whosonfirst/sqlite/whosonfirst-data-mapped.db".to_string(),
),
target: Some("/mnt/whosonfirst/whosonfirst-spatialite.db".to_string()),
typ: Some(MountTypeEnum::BIND),
..Default::default()
}]),
..Default::default()
}),
exposed_ports: Some(HashMap::from([("3102/tcp", HashMap::new())])),
cmd: Some(vec![
"server",
"--db",
"/mnt/whosonfirst/whosonfirst-spatialite.db",
]),
exposed_ports: Some(HashMap::from([("3000/tcp", HashMap::new())])),
..Default::default()
};

Expand Down Expand Up @@ -244,23 +248,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let index_path = args.index.clone();
let docker = docker_connect().await?;
let max_pip = 8;
for i in 0..max_pip {
let _ = subprocess::Exec::cmd("chcon")
.arg("-t")
.arg("container_file_t")
.arg(&args.wof_db)
.join();
maybe_start_pip_container(&args.wof_db, i, args.recreate, &docker).await?;
}
let _ = subprocess::Exec::cmd("chcon")
.arg("-t")
.arg("container_file_t")
.arg(&args.wof_db)
.join();
maybe_start_pip_container(&args.wof_db, args.recreate, &docker).await?;

if let Some(turbosm_path) = args.turbosm {
let mut nonblocking_join_handles = Vec::new();
let (no_admin_sender, no_admin_receiver): (Sender<AirmailPoi>, Receiver<AirmailPoi>) =
crossbeam::channel::bounded(1024);
crossbeam::channel::bounded(1024 * 64);
let (to_index_sender, to_index_receiver): (Sender<AirmailPoi>, Receiver<AirmailPoi>) =
crossbeam::channel::bounded(1024);
for _ in 0..128 {
crossbeam::channel::bounded(1024 * 64);

for _ in 0..1.max(num_cpus::get() / 2) {
let no_admin_receiver = no_admin_receiver.clone();
let to_index_sender = to_index_sender.clone();
nonblocking_join_handles.push(spawn(async move {
Expand All @@ -271,8 +273,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
break;
};
let mut sent = false;
for _attempt in 0..5 {
let port = (rand::random::<usize>() % max_pip) + 3102;
for attempt in 0..5 {
if attempt > 0 {
println!("Retrying to populate admin areas.");
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let port = 3102;
if let Err(err) = populate_admin_areas(&mut poi, port).await {
println!("Failed to populate admin areas. {}", err);
} else {
Expand Down Expand Up @@ -351,7 +357,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
crossbeam::channel::bounded(1024);
let mut blocking_join_handles = Vec::new();
let mut nonblocking_join_handles = Vec::new();
for _ in 0..16 {
for _ in 0..64 {
let receiver = raw_receiver.clone();
let no_admin_sender = no_admin_sender.clone();
let count = count.clone();
Expand Down Expand Up @@ -402,7 +408,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let mut sent = false;
for _attempt in 0..5 {
let port = (rand::random::<usize>() % max_pip) + 3102;
let port = 3102;
if let Err(err) = populate_admin_areas(&mut poi, port).await {
println!("Failed to populate admin areas. {}", err);
} else {
Expand Down
42 changes: 20 additions & 22 deletions airmail_index/src/openstreetmap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, error::Error};

use airmail::poi::AirmailPoi;
use airmail_common::categories::{
Expand Down Expand Up @@ -79,15 +79,7 @@ fn tags_to_poi(tags: &HashMap<String, String>, lat: f64, lng: f64) -> Option<Air
unit,
lat,
lng,
tags.iter()
.filter_map(|(k, v)| {
if k.starts_with("addr:") || v.len() > 1024 {
None
} else {
Some((k.to_string(), v.to_string()))
}
})
.collect(),
vec![],
)
.unwrap(),
)
Expand Down Expand Up @@ -118,11 +110,15 @@ fn index_way(tags: &HashMap<String, String>, way: &Way) -> Option<AirmailPoi> {
tags_to_poi(&tags, lat, lng)
}

fn relation_centroid(relation: &Relation, level: u32) -> Option<(f64, f64)> {
fn relation_centroid(
relation: &Relation,
level: u32,
turbosm: &Turbosm,
) -> Result<(f64, f64), Box<dyn Error>> {
let mut points = Vec::new();
if level > 3 {
debug!("Skipping relation with level > 10: {:?}", relation.id());
return None;
return Err("Skipping relation with level > 10".into());
}
for member in relation.members() {
match member {
Expand All @@ -134,8 +130,9 @@ fn relation_centroid(relation: &Relation, level: u32) -> Option<(f64, f64)> {
points.push(Point::new(centroid.0, centroid.1));
}
}
RelationMember::Relation(_, relation) => {
if let Some(centroid) = relation_centroid(&relation, level + 1) {
RelationMember::Relation(_, other_relation) => {
let other_relation = turbosm.relation(*other_relation)?;
if let Ok(centroid) = relation_centroid(&other_relation, level + 1, turbosm) {
points.push(Point::new(centroid.0, centroid.1));
} else {
debug!("Skipping relation with no centroid: {:?}", relation.id());
Expand All @@ -144,35 +141,36 @@ fn relation_centroid(relation: &Relation, level: u32) -> Option<(f64, f64)> {
}
}
let multipoint = MultiPoint::from(points);
let centroid = multipoint.centroid()?;
Some((centroid.x(), centroid.y()))
let centroid = multipoint.centroid().ok_or("No centroid")?;
Ok((centroid.x(), centroid.y()))
}

pub fn parse_osm<CB: Sync + Fn(AirmailPoi) -> Result<(), Box<dyn std::error::Error>>>(
db_path: &str,
callback: &CB,
) -> Result<(), Box<dyn std::error::Error>> {
let mut osm = Turbosm::open(db_path).unwrap();
let mut osm =
Turbosm::open(db_path, &["natural", "highway", "admin_level", "boundary"]).unwrap();
println!("Processing nodes");
osm.process_all_nodes(|node| {
osm.process_all_nodes(|node, _| {
if let Some(poi) = tags_to_poi(node.tags(), node.lat(), node.lng()) {
if let Err(err) = callback(poi) {
warn!("Error from callback: {}", err);
}
}
})?;
println!("Processing ways");
osm.process_all_ways(|way| {
osm.process_all_ways(|way, _| {
index_way(way.tags(), &way).map(|poi| {
if let Err(err) = callback(poi) {
warn!("Error from callback: {}", err);
}
});
})?;
println!("Processing relations");
osm.process_all_relations(|relation| {
let centroid = relation_centroid(&relation, 0);
if let Some(centroid) = centroid {
osm.process_all_relations(|relation, turbosm| {
let centroid = relation_centroid(&relation, 0, turbosm);
if let Ok(centroid) = centroid {
if let Some(poi) = tags_to_poi(relation.tags(), centroid.1, centroid.0) {
if let Err(err) = callback(poi) {
warn!("Error from callback: {}", err);
Expand Down
Loading

0 comments on commit 6369c70

Please sign in to comment.