Skip to content

Commit

Permalink
98 is none required in attribute column name (#104)
Browse files Browse the repository at this point in the history
* updating join_attributes

* refactoring de-dupe on create_join_and_save_timeseries_query for db

* default order_by, changelog updates

* update teehr-hub config

---------

Co-authored-by: Sam Lamont <[email protected]>
Co-authored-by: Matt Denno <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent 5106099 commit c251332
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 87 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Changed `primary_max_value_time`, `secondary_max_value_time` and `max_value_timedelta`
queries to use built-in functions instead of CTEs. This improves speed significantly.
* Fixed bug in queries when filtering by `configuration`, `measurement_unit` and `variable.`
* Refactored `join_attributes` in `TEEHRDatasetDB` to better handle attributes with no units.
* Refactored `create_join_and_save_timeseries_query queries` so that the de-duplication
CTE is after the intial join CTE for improved performance.
* Changes default list of `order_by` variables in `insert_joined_timeseries` to improve
query performance


## [0.3.1] - 2023-12-08
Expand Down
98 changes: 41 additions & 57 deletions src/teehr/database/teehr_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,11 @@ def insert_joined_timeseries(
secondary_filepath: Union[str, Path],
crosswalk_filepath: Union[str, Path],
order_by: List[str] = [
"reference_time",
"primary_location_id",
"configuration",
"variable_name",
"measurement_unit",
"value_time"
],
drop_added_fields=False,
):
Expand Down Expand Up @@ -800,35 +803,6 @@ def _get_unique_attributes(self, attributes_filepath: str) -> List:
attr_list = self.query(query, format="df").to_dict(orient="records")
return attr_list

def _pivot_attribute_table(
self, attributes_filepath: str, attr: duckdb.DuckDBPyRelation
) -> duckdb.DuckDBPyRelation:
"""Pivots an attribute table selected as a name-unit pair.
The schema of the returned table consists of a field whose name
is a combination of attribute_name and attribute_unit, and whose
values are attribute_value"""

query = f"""
WITH attribute AS (
SELECT *
FROM
read_parquet('{attributes_filepath}')
WHERE
attribute_name = '{attr["attribute_name"]}'
AND
attribute_unit = '{attr["attribute_unit"]}'
)
PIVOT
attribute
ON
attribute_name, attribute_unit
USING
FIRST(attribute_value)
;"""
attr_pivot = self.query(query=query, format="relation")

return attr_pivot

def _add_field_name_to_joined_timeseries(
self, field_name: str, field_dtype="VARCHAR"
):
Expand All @@ -842,23 +816,6 @@ def _add_field_name_to_joined_timeseries(
;"""
self.query(query)

def _join_attribute_values(self, field_name: str):
"""Join values of the new attr field on location_id"""
update_query = f"""
UPDATE
joined_timeseries
SET
{field_name} = (
SELECT
attr_pivot.{field_name}
FROM
attr_pivot
WHERE
joined_timeseries.primary_location_id = attr_pivot.location_id)
;"""
print(f"Joining {field_name} values to joined_timeseries")
self.query(update_query)

def join_attributes(self, attributes_filepath: Union[str, Path]):
"""Joins attributes from the provided attribute table(s) to new
fields in the joined_timeseries table
Expand All @@ -873,20 +830,47 @@ def join_attributes(self, attributes_filepath: Union[str, Path]):
attr_list = self._get_unique_attributes(str(attributes_filepath))

for attr in attr_list:
# Pivot the single attribute
attr_pivot = self._pivot_attribute_table(
str(attributes_filepath), attr
)

# Add the attr field name to joined_timeseries
field_name = attr_pivot.columns
field_name.remove("location_id")
field_name = self._sanitize_field_name(field_name[0])
if attr["attribute_unit"]:
field_name = (
f"{attr['attribute_name']}_{attr['attribute_unit']}"
)
unit_clause = (
f"AND attribute_unit = '{attr['attribute_unit']}'"
)
else:
field_name = attr["attribute_name"]
unit_clause = ""

field_name = self._sanitize_field_name(field_name)

self._add_field_name_to_joined_timeseries(field_name)

# Join the attribute values to the new field, attr_pivot
self._join_attribute_values(field_name)
query = f"""
WITH selected_attribute AS (
SELECT
*
FROM
read_parquet('{attributes_filepath}')
WHERE
attribute_name = '{attr['attribute_name']}'
{unit_clause}
)
UPDATE
joined_timeseries
SET
{field_name} = (
SELECT
CAST(attribute_value AS VARCHAR)
FROM
selected_attribute
WHERE
joined_timeseries.primary_location_id =
selected_attribute.location_id
)
;"""

self.query(query)

def calculate_field(
self,
Expand Down
51 changes: 33 additions & 18 deletions src/teehr/queries/duckdb_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,29 +178,15 @@ def create_join_and_save_timeseries_query(jtq: JoinedTimeseriesQuery) -> str:
"""

query = f"""
WITH filtered_primary AS (
SELECT * FROM(
SELECT *,
row_number() OVER(
PARTITION BY value_time,
location_id,
configuration,
measurement_unit,
variable_name
ORDER BY reference_time desc)
AS rn
FROM read_parquet("{str(jtq.primary_filepath)}")
) t
WHERE rn = 1
),
joined as (
WITH initial_joined as (
SELECT
sf.reference_time,
sf.value_time,
sf.location_id as secondary_location_id,
sf.value as secondary_value,
sf.configuration,
sf.measurement_unit,
pf.reference_time as primary_reference_time,
sf.variable_name,
pf.value as primary_value,
pf.location_id as primary_location_id,
Expand All @@ -209,11 +195,40 @@ def create_join_and_save_timeseries_query(jtq: JoinedTimeseriesQuery) -> str:
FROM read_parquet('{str(jtq.secondary_filepath)}') sf
JOIN read_parquet('{str(jtq.crosswalk_filepath)}') cf
on cf.secondary_location_id = sf.location_id
JOIN filtered_primary pf
JOIN read_parquet("{str(jtq.primary_filepath)}") pf
on cf.primary_location_id = pf.location_id
and sf.value_time = pf.value_time
and sf.measurement_unit = pf.measurement_unit
and sf.variable_name = pf.variable_name
),
joined AS (
SELECT
reference_time
, value_time
, secondary_location_id
, secondary_value
, configuration
, measurement_unit
, variable_name
, primary_value
, primary_location_id
, lead_time
, absolute_difference
FROM(
SELECT *,
row_number()
OVER(
PARTITION BY value_time,
primary_location_id,
configuration,
variable_name,
measurement_unit,
reference_time
ORDER BY primary_reference_time desc
) AS rn
FROM initial_joined
)
WHERE rn = 1
)
INSERT INTO joined_timeseries
SELECT
Expand All @@ -222,7 +237,7 @@ def create_join_and_save_timeseries_query(jtq: JoinedTimeseriesQuery) -> str:
joined
ORDER BY
{",".join(jtq.order_by)}
;""" # noqa
;"""

return query

Expand Down
4 changes: 2 additions & 2 deletions teehr-hub/helm-chart/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ jupyterhub:
default: True
description: "A r5.xlarge EC2 instance $0.252/hour"
kubespawner_override:
image: 935462133478.dkr.ecr.us-east-2.amazonaws.com/teehr:v0.3.0
image: 935462133478.dkr.ecr.us-east-2.amazonaws.com/teehr:v0.3.2
cpu_limit: null
mem_limit: null
node_selector:
node.kubernetes.io/instance-type: r5.xlarge
- display_name: "TEEHR Evaluation System (16 vCPU/128GB memory) v0.3.0"
description: "A r5.4xlarge EC2 instance @ $1.008/hour"
kubespawner_override:
image: 935462133478.dkr.ecr.us-east-2.amazonaws.com/teehr:v0.3.0
image: 935462133478.dkr.ecr.us-east-2.amazonaws.com/teehr:v0.3.2
cpu_limit: null
mem_limit: null
node_selector:
Expand Down
12 changes: 9 additions & 3 deletions tests/data/test_study/geo/test_attr.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ location_id,attribute_name,attribute_value,attribute_unit
gage-A,drainage_area,50,sq_km
gage-B,drainage_area,20,sq_km
gage-C,drainage_area,30,sq_km
gage-A,year_2_discharge,500,cfs
gage-B,year_2_discharge,200,cfs
gage-C,year_2_discharge,300,cfs
gage-A,year_2_discharge,500,ft^3/s
gage-B,year_2_discharge,200,ft^3/s
gage-C,year_2_discharge,300,ft^3/s
gage-A,ecoregion,coastal_plain
gage-B,ecoregion,piedmont
gage-C,ecoregion,blue_ridge
gage-A,drainage_area,150,sq_mi
gage-B,drainage_area,120,sq_mi
gage-C,drainage_area,130,sq_mi
Binary file modified tests/data/test_study/geo/test_attr.parquet
Binary file not shown.
22 changes: 15 additions & 7 deletions tests/test_teehr_dataset_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Test data
TEST_STUDY_DIR = Path("tests", "data", "test_study")
PRIMARY_FILEPATH = Path(TEST_STUDY_DIR, "timeseries", "*short_obs.parquet")
PRIMARY_FILEPATH_DUPS = Path(TEST_STUDY_DIR, "timeseries", "*dup_obs.parquet")
SECONDARY_FILEPATH = Path(TEST_STUDY_DIR, "timeseries", "*_fcast.parquet")
CROSSWALK_FILEPATH = Path(TEST_STUDY_DIR, "geo", "crosswalk.parquet")
GEOMETRY_FILEPATH = Path(TEST_STUDY_DIR, "geo", "gages.parquet")
Expand All @@ -23,7 +24,7 @@ def test_insert_joined_timeseries():

# Perform the join and insert into duckdb database
tds.insert_joined_timeseries(
primary_filepath=PRIMARY_FILEPATH,
primary_filepath=PRIMARY_FILEPATH_DUPS,
secondary_filepath=SECONDARY_FILEPATH,
crosswalk_filepath=CROSSWALK_FILEPATH,
drop_added_fields=True,
Expand Down Expand Up @@ -254,7 +255,7 @@ def test_join_attributes():
# Add attributes
tds.join_attributes(ATTRIBUTES_FILEPATH)

df = tds.get_joined_timeseries_schema()
df = tds.query("SELECT * FROM joined_timeseries;", format="df")

cols = [
"reference_time",
Expand All @@ -269,22 +270,29 @@ def test_join_attributes():
"lead_time",
"absolute_difference",
"drainage_area_sq_km",
"year_2_discharge_cfs",
"drainage_area_sq_mi",
"year_2_discharge_ft_3_s",
"ecoregion"
]
# Make sure attribute fields have been added
assert sorted(df.column_name.tolist()) == sorted(cols)
assert sorted(df.columns.tolist()) == sorted(cols)

# Make sure attribute values are correct
df = tds.query("SELECT * FROM joined_timeseries", format="df")
np.testing.assert_approx_equal(
df.year_2_discharge_cfs.astype(float).sum(), 72000.0, significant=6
df.year_2_discharge_ft_3_s.astype(float).sum(), 72000.0, significant=6
)

np.testing.assert_approx_equal(
df.drainage_area_sq_km.astype(float).sum(), 7200.0, significant=5
)

pass
np.testing.assert_approx_equal(
df.drainage_area_sq_mi.astype(float).sum(), 28800.0, significant=5
)

assert (
df.ecoregion.unique() == ["coastal_plain", "piedmont", "blue_ridge"]
).all()


def test_get_joined_timeseries_schema():
Expand Down

0 comments on commit c251332

Please sign in to comment.