Skip to content

Commit

Permalink
Change field names and obstime parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Jul 9, 2024
1 parent 1ca0852 commit aa0b558
Showing 1 changed file with 37 additions and 45 deletions.
82 changes: 37 additions & 45 deletions ingestion/src/kvkafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,50 @@ pub enum Error {
Kafka(#[from] kafka::Error),
#[error("postgres returned an error: {0}")]
Database(#[from] tokio_postgres::Error),
#[error(
"no Timeseries ID found for this data - station {}, param {}",
station,
param
)]
TimeseriesMissing { station: i32, param: i32 },
#[error("no Timeseries ID found for this data - station {0}, param {1}")]
TimeseriesMissing(i32, i32),
#[error("error while deserializing message: {0}")]
Deserialize(#[from] quick_xml::DeError),
}

#[derive(Debug, Deserialize)]
/// Represents <KvalobsData>...</KvalobsData>
struct KvalobsData {
station: Vec<Stations>,
#[serde(rename = "station")]
stations: Vec<Station>,
}
#[derive(Debug, Deserialize)]
/// Represents <station>...</station>
struct Stations {
struct Station {
#[serde(rename = "@val")]
val: i32,
typeid: Vec<Typeid>,
#[serde(rename = "typeid")]
typeids: Vec<Typeid>,
}
#[derive(Debug, Deserialize)]
/// Represents <typeid>...</typeid>
struct Typeid {
#[serde(rename = "@val")]
val: i32,
obstime: Vec<Obstime>,
#[serde(rename = "obstime")]
obstimes: Vec<Obstime>,
}
#[derive(Debug, Deserialize)]
/// Represents <obstime>...</obstime>
struct Obstime {
#[serde(rename = "@val")]
val: String, // avoiding parsing time at this point...
tbtime: Vec<Tbtime>,
#[serde(rename = "tbtime")]
tbtimes: Vec<Tbtime>,
}
#[derive(Debug, Deserialize)]
/// Represents <tbtime>...</tbtime>
struct Tbtime {
#[serde(rename = "@val")]
_val: String, // avoiding parsing time at this point...
_kvtextdata: Option<Vec<Kvtextdata>>,
sensor: Vec<Sensor>,
#[serde(rename = "sensor")]
sensors: Vec<Sensor>,
}
/// Represents <kvtextdata>...</kvtextdata>
#[derive(Debug, Deserialize)]
Expand All @@ -71,7 +72,8 @@ struct Kvtextdata {
struct Sensor {
#[serde(rename = "@val", deserialize_with = "zero_to_none")]
val: Option<i32>,
level: Vec<Level>,
#[serde(rename = "level")]
levels: Vec<Level>,
}
/// Represents <level>...</level>
#[derive(Debug, Deserialize)]
Expand All @@ -81,7 +83,7 @@ struct Level {
kvdata: Option<Vec<Kvdata>>,
}

// Change the sensor and level back to null if they are 0
// Deserialize sensor and level to null if they are 0
// 0 is the default for kvalobs, but through obsinn it's actually just missing
fn zero_to_none<'de, D>(des: D) -> Result<Option<i32>, D::Error>
where
Expand Down Expand Up @@ -148,7 +150,7 @@ pub async fn read_and_insert(pool: PgConnectionPool, group_string: String) {
read_kafka(group_string, tx).await;
});

let client = pool.get().await.expect("Couldn't connect to database");
let client = pool.get().await.expect("couldn't connect to database");
while let Some(msg) = rx.recv().await {
if let Err(e) = insert_kvdata(&client, msg).await {
eprintln!("Database insert error: {e}");
Expand All @@ -171,40 +173,35 @@ pub async fn parse_message(message: &[u8], tx: &mpsc::Sender<Msg>) -> Result<(),
));
}

let kvalobs_xmlmsg = match xmlmsg.find("?>") {
let xmlmsg = match xmlmsg.find("?>") {
Some(loc) => &xmlmsg[(loc + 2)..],
None => {
return Err(Error::IssueParsingXML(
"couldn't find end of xml tag '?>'".to_string(),
))
}
};
let item: KvalobsData = quick_xml::de::from_str(kvalobs_xmlmsg)?;
let item: KvalobsData = quick_xml::de::from_str(xmlmsg)?;

// get the useful stuff out of this struct
for station in item.station {
for typeid in station.typeid {
for obstime in typeid.obstime {
// TODO: should we return on error here
for station in item.stations {
for typeid in station.typeids {
for obstime in typeid.obstimes {
let obs_time =
NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S")?.and_utc();
// TODO: or continue/break?
// let obs_time =
// match NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S") {
// Ok(time) => time.and_utc(),
// Err(e) => {
// eprintln!("{e}");
// break; // continue;
// }
// };
for tbtime in obstime.tbtime {
// NOTE: this is "table time" which can vary from the actual observation time,
// its the first time it entered the db in kvalobs
// currently not using it
// TODO: Do we want to handle text data at all, it doesn't seem to be QCed
match NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S") {
Ok(time) => time.and_utc(),
Err(e) => {
eprintln!("{}", Error::IssueParsingTime(e));
continue;
}
};
for tbtime in obstime.tbtimes {
// NOTE: tbtime is "table time" which can vary from the actual observation time,
// it's the first time it entered the db in kvalobs. Currently not using it
// TODO: Do we want to handle text data at all? It doesn't seem to be QCed
// if let Some(textdata) = tbtime.kvtextdata {...}
for sensor in tbtime.sensor {
for level in sensor.level {
for sensor in tbtime.sensors {
for level in sensor.levels {
if let Some(kvdata) = level.kvdata {
for data in kvdata {
let msg = Msg {
Expand Down Expand Up @@ -282,7 +279,7 @@ pub async fn insert_kvdata(
kvdata,
}: Msg,
) -> Result<(), Error> {
// what timeseries is this?
// query timeseries ID
// NOTE: alternately could use conn.query_one, since we want exactly one response
let tsid: i32 = client
.query(
Expand All @@ -302,15 +299,10 @@ pub async fn insert_kvdata(
)
.await?
.first()
.ok_or(Error::TimeseriesMissing {
station: kvid.station,
param: kvid.paramid,
})?
.ok_or(Error::TimeseriesMissing(kvid.station, kvid.paramid))?
.get(0);

// write the data into the db
// kvdata derives ToSql therefore options should be nullable
// https://docs.rs/postgres-types/latest/postgres_types/trait.ToSql.html#nullability
client.execute(
"INSERT INTO flags.kvdata (timeseries, obstime, original, corrected, controlinfo, useinfo, cfailed)
VALUES($1, $2, $3, $4, $5, $6, $7)",
Expand Down

0 comments on commit aa0b558

Please sign in to comment.