Skip to content

Commit

Permalink
Add unique constraint and update row on conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Jul 9, 2024
1 parent aa0b558 commit 94fce6a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
8 changes: 4 additions & 4 deletions db/flags.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ CREATE TABLE IF NOT EXISTS flags.kvdata (
corrected REAL NULL,
controlinfo TEXT NULL,
useinfo TEXT NULL,
cfailed INT4 NULL
cfailed INT4 NULL,
CONSTRAINT unique_kvdata_timeseries_obstime UNIQUE (timeseries, obstime)
);
-- TODO: Probably should define unique constraint on (timeseries, obstime) as we have in public.data?
-- Can kvkafka resend data with same (timeseries, obstime)?
CREATE INDEX IF NOT EXISTS kvdata_obtime_index ON flags.kvdata (obstime);

CREATE INDEX IF NOT EXISTS kvdata_obtime_index ON flags.kvdata (obstime);
CREATE INDEX IF NOT EXISTS kvdata_timeseries_index ON flags.kvdata USING HASH (timeseries);
19 changes: 13 additions & 6 deletions ingestion/src/kvkafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ pub async fn insert_kvdata(
// NOTE: alternately could use conn.query_one, since we want exactly one response
let tsid: i32 = client
.query(
"SELECT timeseries FROM labels.met
WHERE station_id = $1 \
AND param_id = $2 \
AND type_id = $3 \
AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4)) \
"SELECT timeseries FROM labels.met
WHERE station_id = $1
AND param_id = $2
AND type_id = $3
AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4))
AND (($5::int IS NULL AND sensor IS NULL) OR (sensor = $5))",
&[
&kvid.station,
Expand All @@ -305,7 +305,14 @@ pub async fn insert_kvdata(
// write the data into the db
client.execute(
"INSERT INTO flags.kvdata (timeseries, obstime, original, corrected, controlinfo, useinfo, cfailed)
VALUES($1, $2, $3, $4, $5, $6, $7)",
VALUES($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT ON CONSTRAINT unique_kvdata_timeseries_obstime
DO UPDATE SET
original = EXCLUDED.original,
corrected = EXCLUDED.corrected,
controlinfo = EXCLUDED.controlinfo,
useinfo = EXCLUDED.useinfo,
cfailed = EXCLUDED.cfailed",
&[&tsid, &obstime, &kvdata.original, &kvdata.corrected, &kvdata.controlinfo, &kvdata.useinfo, &kvdata.cfailed],
).await?;

Expand Down

0 comments on commit 94fce6a

Please sign in to comment.