Skip to content

Commit 6bdbf6e

Browse files
bbalsermacpie
andauthored
in gateway tracker, update location_changed_at for new records being … (#1055)
* in gateway tracker, update location_changed_at for new records being inserted * Add migration to fix null location_changed_at * Add csv import * Fix API return * Add location check * PR comments fixes --------- Co-authored-by: Macpie <[email protected]>
1 parent 88c3dd5 commit 6bdbf6e

File tree

8 files changed

+175
-12
lines changed

8 files changed

+175
-12
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mobile_config/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ bs58 = { workspace = true }
1515
chrono = { workspace = true }
1616
clap = { workspace = true }
1717
config = { workspace = true }
18+
csv = { workspace = true }
1819
futures = { workspace = true }
1920
futures-util = { workspace = true }
21+
h3o = { workspace = true }
2022
helium-crypto = { workspace = true, features = ["sqlx-postgres"] }
2123
helium-proto = { workspace = true }
2224
hextree = { workspace = true }
@@ -45,7 +47,6 @@ tower-http = { workspace = true }
4547
tracing = { workspace = true }
4648
tracing-subscriber = { workspace = true }
4749
triggered = { workspace = true }
48-
csv = { workspace = true }
4950

5051
coverage-map = { path = "../coverage_map" }
5152
custom-tracing = { path = "../custom_tracing", features = ["grpc"] }

mobile_config/src/cli/import.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use crate::{
2+
gateway::db::{Gateway, LocationChangedAtUpdate},
3+
settings::Settings,
4+
};
5+
use chrono::{DateTime, Utc};
6+
use h3o::{error::InvalidCellIndex, CellIndex};
7+
use helium_crypto::PublicKey;
8+
use serde::Deserialize;
9+
use std::{
10+
fs::File,
11+
path::{Path, PathBuf},
12+
str::FromStr,
13+
time::Instant,
14+
};
15+
16+
#[derive(Debug, clap::Parser)]
17+
pub struct Import {
18+
#[clap(short = 'f')]
19+
file: PathBuf,
20+
21+
#[clap(subcommand)]
22+
cmd: Cmd,
23+
}
24+
25+
#[derive(Debug, clap::Subcommand)]
26+
pub enum Cmd {
27+
HotspotsAssertions,
28+
}
29+
30+
impl Import {
31+
pub async fn run(self, settings: &Settings) -> anyhow::Result<()> {
32+
match self.cmd {
33+
Cmd::HotspotsAssertions => {
34+
custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?;
35+
36+
tracing::info!("started");
37+
38+
let pool = settings.database.connect("mobile-config-store").await?;
39+
40+
let start = Instant::now();
41+
42+
let updates = read_csv(self.file)?
43+
.into_iter()
44+
.filter_map(|row| row.try_into().ok())
45+
.collect::<Vec<LocationChangedAtUpdate>>();
46+
47+
tracing::info!("file read, updating {} records", updates.len());
48+
49+
let updated = Gateway::update_bulk_location_changed_at(&pool, &updates).await?;
50+
51+
let elapsed = start.elapsed();
52+
tracing::info!(?elapsed, updated, "finished");
53+
54+
Ok(())
55+
}
56+
}
57+
}
58+
}
59+
60+
#[derive(Debug, Deserialize)]
61+
struct CsvRow {
62+
public_key: PublicKey,
63+
// serialnumber: String,
64+
time: DateTime<Utc>,
65+
// latitude: f64,
66+
// longitude: f64,
67+
h3: String,
68+
// assertion_type: String,
69+
}
70+
71+
#[derive(Debug, thiserror::Error)]
72+
pub enum CsvRowError {
73+
#[error("H3 index parse error: {0}")]
74+
H3IndexParseError(#[from] InvalidCellIndex),
75+
}
76+
77+
impl TryFrom<CsvRow> for LocationChangedAtUpdate {
78+
type Error = CsvRowError;
79+
80+
fn try_from(row: CsvRow) -> Result<Self, Self::Error> {
81+
let cell = CellIndex::from_str(&row.h3)?;
82+
83+
Ok(Self {
84+
address: row.public_key.into(),
85+
location_changed_at: row.time,
86+
location: cell.into(),
87+
})
88+
}
89+
}
90+
91+
fn read_csv<P: AsRef<Path>>(path: P) -> anyhow::Result<Vec<CsvRow>> {
92+
let file = File::open(path)?;
93+
let mut rdr = csv::Reader::from_reader(file);
94+
let mut rows = Vec::new();
95+
96+
for result in rdr.deserialize() {
97+
let record: CsvRow = result?;
98+
rows.push(record);
99+
}
100+
101+
Ok(rows)
102+
}

mobile_config/src/cli/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
cli::{api::Api, server::Server},
2+
cli::{api::Api, import::Import, server::Server},
33
settings::Settings,
44
};
55
use base64::{engine::general_purpose, Engine};
@@ -9,6 +9,7 @@ use std::io::Write;
99
use std::{fs::File, path::PathBuf};
1010

1111
pub mod api;
12+
pub mod import;
1213
pub mod server;
1314

1415
#[derive(Debug, clap::Parser)]
@@ -28,9 +29,9 @@ pub struct Cli {
2829
impl Cli {
2930
pub async fn run(self) -> anyhow::Result<()> {
3031
match self.cmd {
31-
Cmd::Server(server) => {
32+
Cmd::Api(api) => {
3233
let settings = Settings::new(self.config)?;
33-
server.run(&settings).await
34+
api.run(&settings).await
3435
}
3536
Cmd::GenerateKey => {
3637
let kp = Keypair::generate(KeyTag::default(), &mut OsRng);
@@ -54,17 +55,22 @@ impl Cli {
5455

5556
Ok(())
5657
}
57-
Cmd::Api(api) => {
58+
Cmd::Import(import) => {
5859
let settings = Settings::new(self.config)?;
59-
api.run(&settings).await
60+
import.run(&settings).await
61+
}
62+
Cmd::Server(server) => {
63+
let settings = Settings::new(self.config)?;
64+
server.run(&settings).await
6065
}
6166
}
6267
}
6368
}
6469

6570
#[derive(Debug, clap::Subcommand)]
6671
pub enum Cmd {
67-
Server(Server),
68-
GenerateKey,
6972
Api(Api),
73+
GenerateKey,
74+
Import(Import),
75+
Server(Server),
7076
}

mobile_config/src/gateway/db.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ pub struct Gateway {
9696
pub location_changed_at: Option<DateTime<Utc>>,
9797
pub location_asserts: Option<u32>,
9898
}
99+
#[derive(Debug)]
100+
pub struct LocationChangedAtUpdate {
101+
pub address: PublicKeyBinary,
102+
pub location_changed_at: DateTime<Utc>,
103+
pub location: u64,
104+
}
99105

100106
impl Gateway {
101107
pub async fn insert_bulk(pool: &PgPool, rows: &[Gateway]) -> anyhow::Result<u64> {
@@ -335,6 +341,48 @@ impl Gateway {
335341
.map_err(anyhow::Error::from)
336342
.filter_map(|res| async move { res.ok() })
337343
}
344+
345+
pub async fn update_bulk_location_changed_at(
346+
pool: &PgPool,
347+
updates: &[LocationChangedAtUpdate],
348+
) -> anyhow::Result<u64> {
349+
if updates.is_empty() {
350+
return Ok(0);
351+
}
352+
353+
const MAX_ROWS: usize = 20000;
354+
let mut total = 0;
355+
356+
for chunk in updates.chunks(MAX_ROWS) {
357+
let mut qb = QueryBuilder::<Postgres>::new(
358+
r#"
359+
UPDATE gateways AS g
360+
SET location_changed_at = v.location_changed_at
361+
FROM (
362+
"#,
363+
);
364+
365+
qb.push_values(chunk, |mut b, update| {
366+
b.push_bind(update.address.as_ref())
367+
.push_bind(update.location_changed_at)
368+
.push_bind(update.location as i64);
369+
});
370+
371+
qb.push(
372+
r#"
373+
) AS v(address, location_changed_at, location)
374+
WHERE g.address = v.address
375+
AND g.location_changed_at IS NULL
376+
AND g.location = v.location
377+
"#,
378+
);
379+
380+
let res = qb.build().execute(pool).await?;
381+
total += res.rows_affected();
382+
}
383+
384+
Ok(total)
385+
}
338386
}
339387

340388
impl FromRow<'_, PgRow> for Gateway {

mobile_config/src/gateway/metadata_db.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,12 @@ impl MobileHotspotInfo {
132132
elevation,
133133
azimuth,
134134
location,
135-
// Updated via SQL query see Gateway::insert
136-
location_changed_at: None,
135+
// Set to refreshed_at when hotspot has a location, None otherwise
136+
location_changed_at: if location.is_some() {
137+
Some(self.refreshed_at.unwrap_or_else(Utc::now))
138+
} else {
139+
None
140+
},
137141
location_asserts: self.num_location_asserts.map(|n| n as u32),
138142
}))
139143
}

mobile_config/src/gateway/service/info.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ impl From<Gateway> for GatewayInfo {
252252
metadata,
253253
device_type: gateway.gateway_type.into(),
254254
created_at: Some(gateway.created_at),
255-
updated_at: Some(gateway.updated_at),
255+
// because updated_at refers to the last time the data was actually changed.
256+
updated_at: Some(gateway.last_changed_at),
256257
refreshed_at: Some(gateway.refreshed_at),
257258
}
258259
}

mobile_config/tests/integrations/gateway_tracker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ async fn execute_test(pool: PgPool) -> anyhow::Result<()> {
4646
assert_eq!(gateway1.elevation, None);
4747
assert_eq!(gateway1.azimuth, None);
4848
assert_eq!(gateway1.location, Some(hex1 as u64));
49-
assert_eq!(gateway1.location_changed_at, None);
49+
assert_eq!(gateway1.location_changed_at, Some(now));
5050
assert_eq!(gateway1.location_asserts, Some(1));
5151

5252
Ok(())

0 commit comments

Comments
 (0)