@@ -39,22 +39,22 @@ pub async fn populate_admin_areas(poi: &mut AirmailPoi, port: usize) -> Result<(
39
39
. locality
40
40
. unwrap_or_default ( )
41
41
. iter ( )
42
- . map ( |a| a. name . to_lowercase ( ) )
42
+ . map ( |a| a. to_lowercase ( ) )
43
43
. collect ( ) ;
44
44
if let Some ( neighbourhood) = pip_response. neighbourhood {
45
- locality. extend ( neighbourhood. iter ( ) . map ( |a| a. name . to_lowercase ( ) ) ) ;
45
+ locality. extend ( neighbourhood. iter ( ) . map ( |a| a. to_lowercase ( ) ) ) ;
46
46
}
47
47
let region = pip_response
48
48
. region
49
49
. unwrap_or_default ( )
50
50
. iter ( )
51
- . map ( |a| a. name . to_lowercase ( ) )
51
+ . map ( |a| a. to_lowercase ( ) )
52
52
. collect ( ) ;
53
53
let country = pip_response
54
54
. country
55
55
. unwrap_or_default ( )
56
56
. iter ( )
57
- . map ( |a| a. name . to_lowercase ( ) )
57
+ . map ( |a| a. to_lowercase ( ) )
58
58
. collect ( ) ;
59
59
60
60
poi. locality = locality;
@@ -69,7 +69,7 @@ struct Args {
69
69
/// Path to the Docker socket.
70
70
#[ clap( long, short) ]
71
71
docker_socket : Option < String > ,
72
- /// Path to the Who's On First SQLite database.
72
+ /// Path to the Who's On First Spatialite database.
73
73
#[ clap( long, short) ]
74
74
wof_db : String ,
75
75
/// Whether to forcefully recreate the container. Default false.
@@ -96,7 +96,7 @@ enum ContainerStatus {
96
96
DoesNotExist ,
97
97
}
98
98
99
- const PIP_SERVICE_IMAGE : & str = "docker.io/pelias/pip-service :latest" ;
99
+ const PIP_SERVICE_IMAGE : & str = "docker.io/pelias/spatial :latest" ;
100
100
101
101
async fn docker_connect ( ) -> Result < Docker , Box < dyn std:: error:: Error > > {
102
102
let docker = if let Some ( docker_socket) = & Args :: parse ( ) . docker_socket {
@@ -134,10 +134,11 @@ async fn get_container_status(
134
134
135
135
async fn maybe_start_pip_container (
136
136
wof_db_path : & str ,
137
- idx : usize ,
138
137
recreate : bool ,
139
138
docker : & Docker ,
140
139
) -> Result < ( ) , Box < dyn std:: error:: Error > > {
140
+ // Holdover from when we had multiple containers.
141
+ let idx = 0 ;
141
142
let container_state = get_container_status ( idx, docker) . await ?;
142
143
if container_state == ContainerStatus :: Running && !recreate {
143
144
println ! (
@@ -154,23 +155,26 @@ async fn maybe_start_pip_container(
154
155
env : Some ( vec ! [ ] ) ,
155
156
host_config : Some ( HostConfig {
156
157
port_bindings : Some ( HashMap :: from ( [ (
157
- 3102 . to_string ( ) ,
158
+ 3000 . to_string ( ) ,
158
159
Some ( vec ! [ bollard:: models:: PortBinding {
159
160
host_ip: None ,
160
161
host_port: Some ( format!( "{}" , 3102 + idx) ) ,
161
162
} ] ) ,
162
163
) ] ) ) ,
163
164
mounts : Some ( vec ! [ bollard:: models:: Mount {
164
165
source: Some ( wof_db_path. to_string( ) ) ,
165
- target: Some (
166
- "/mnt/pelias/whosonfirst/sqlite/whosonfirst-data-mapped.db" . to_string( ) ,
167
- ) ,
166
+ target: Some ( "/mnt/whosonfirst/whosonfirst-spatialite.db" . to_string( ) ) ,
168
167
typ: Some ( MountTypeEnum :: BIND ) ,
169
168
..Default :: default ( )
170
169
} ] ) ,
171
170
..Default :: default ( )
172
171
} ) ,
173
- exposed_ports : Some ( HashMap :: from ( [ ( "3102/tcp" , HashMap :: new ( ) ) ] ) ) ,
172
+ cmd : Some ( vec ! [
173
+ "server" ,
174
+ "--db" ,
175
+ "/mnt/whosonfirst/whosonfirst-spatialite.db" ,
176
+ ] ) ,
177
+ exposed_ports : Some ( HashMap :: from ( [ ( "3000/tcp" , HashMap :: new ( ) ) ] ) ) ,
174
178
..Default :: default ( )
175
179
} ;
176
180
@@ -244,23 +248,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
244
248
let args = Args :: parse ( ) ;
245
249
let index_path = args. index . clone ( ) ;
246
250
let docker = docker_connect ( ) . await ?;
247
- let max_pip = 8 ;
248
- for i in 0 ..max_pip {
249
- let _ = subprocess:: Exec :: cmd ( "chcon" )
250
- . arg ( "-t" )
251
- . arg ( "container_file_t" )
252
- . arg ( & args. wof_db )
253
- . join ( ) ;
254
- maybe_start_pip_container ( & args. wof_db , i, args. recreate , & docker) . await ?;
255
- }
251
+ let _ = subprocess:: Exec :: cmd ( "chcon" )
252
+ . arg ( "-t" )
253
+ . arg ( "container_file_t" )
254
+ . arg ( & args. wof_db )
255
+ . join ( ) ;
256
+ maybe_start_pip_container ( & args. wof_db , args. recreate , & docker) . await ?;
256
257
257
258
if let Some ( turbosm_path) = args. turbosm {
258
259
let mut nonblocking_join_handles = Vec :: new ( ) ;
259
260
let ( no_admin_sender, no_admin_receiver) : ( Sender < AirmailPoi > , Receiver < AirmailPoi > ) =
260
- crossbeam:: channel:: bounded ( 1024 ) ;
261
+ crossbeam:: channel:: bounded ( 1024 * 64 ) ;
261
262
let ( to_index_sender, to_index_receiver) : ( Sender < AirmailPoi > , Receiver < AirmailPoi > ) =
262
- crossbeam:: channel:: bounded ( 1024 ) ;
263
- for _ in 0 ..128 {
263
+ crossbeam:: channel:: bounded ( 1024 * 64 ) ;
264
+
265
+ for _ in 0 ..1 . max ( num_cpus:: get ( ) / 2 ) {
264
266
let no_admin_receiver = no_admin_receiver. clone ( ) ;
265
267
let to_index_sender = to_index_sender. clone ( ) ;
266
268
nonblocking_join_handles. push ( spawn ( async move {
@@ -271,8 +273,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
271
273
break ;
272
274
} ;
273
275
let mut sent = false ;
274
- for _attempt in 0 ..5 {
275
- let port = ( rand:: random :: < usize > ( ) % max_pip) + 3102 ;
276
+ for attempt in 0 ..5 {
277
+ if attempt > 0 {
278
+ println ! ( "Retrying to populate admin areas." ) ;
279
+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) . await ;
280
+ }
281
+ let port = 3102 ;
276
282
if let Err ( err) = populate_admin_areas ( & mut poi, port) . await {
277
283
println ! ( "Failed to populate admin areas. {}" , err) ;
278
284
} else {
@@ -351,7 +357,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
351
357
crossbeam:: channel:: bounded ( 1024 ) ;
352
358
let mut blocking_join_handles = Vec :: new ( ) ;
353
359
let mut nonblocking_join_handles = Vec :: new ( ) ;
354
- for _ in 0 ..16 {
360
+ for _ in 0 ..64 {
355
361
let receiver = raw_receiver. clone ( ) ;
356
362
let no_admin_sender = no_admin_sender. clone ( ) ;
357
363
let count = count. clone ( ) ;
@@ -402,7 +408,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
402
408
} ;
403
409
let mut sent = false ;
404
410
for _attempt in 0 ..5 {
405
- let port = ( rand :: random :: < usize > ( ) % max_pip ) + 3102 ;
411
+ let port = 3102 ;
406
412
if let Err ( err) = populate_admin_areas ( & mut poi, port) . await {
407
413
println ! ( "Failed to populate admin areas. {}" , err) ;
408
414
} else {
0 commit comments