Skip to content

Commit

Permalink
105 add get joined timeseries to teehrdataset (#106)
Browse files Browse the repository at this point in the history
* adding get_joined_timeseries query to TDS

* version bump and changelog update

* explicit argument types in queries/util.py

* doc string update

---------

Co-authored-by: Sam Lamont <[email protected]>
  • Loading branch information
samlamont and Sam Lamont authored Feb 5, 2024
1 parent c251332 commit afcd762
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 152 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.3] - 2023-12-13

### Added
* Adds `get_joined_timeseries` method to TEEHR Dataset classes.

### Changed
* Updated validation fields in the `TimeSeriesQuery` pydantic model to accept only selected fields
rather than existing database fields.
* Updated function argument typing in `queries/utils.py` to be more explicit

## [0.3.2] - 2023-12-12

### Added
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ $ poetry add git+https://github.com/RTIInternational/teehr.git#[BRANCH TAG]

Use Docker
```bash
$ docker build -t teehr:v0.3.2 .
$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.2 jupyter lab --ip 0.0.0.0 $HOME
$ docker build -t teehr:v0.3.3 .
$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.3 jupyter lab --ip 0.0.0.0 $HOME
```

## Examples
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "teehr"
version = "0.3.2"
version = "0.3.3"
description = "Tools for Exploratory Evaluation in Hydrologic Research"
authors = [
"RTI International",
Expand Down
149 changes: 147 additions & 2 deletions src/teehr/database/teehr_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import teehr.queries.utils as tqu
from teehr.models.queries_database import (
JoinedFieldNameEnum,
InsertJoinedTimeseriesQuery,
JoinedTimeseriesQuery,
CalculateField,
MetricQuery,
Expand Down Expand Up @@ -387,6 +388,69 @@ def get_metrics(
df = self.query(query, format="df")
return df

def get_joined_timeseries(
self,
jtq: JoinedTimeseriesQuery
) -> Union[pd.DataFrame, gpd.GeoDataFrame, str]:
"""Retrieve joined timeseries using database query.
Parameters
----------
jtq : JoinedTimeseriesQuery
Pydantic model containing query parameters
tq Fields
----------
order_by : List[str]
List of column/field names to order results by.
Must provide at least one.
filters : Union[List[dict], None] = None
List of dictionaries describing the "where" clause to limit data
that is included in metrics.
return_query: bool = False
True returns the query string instead of the data
include_geometry : bool
True joins the geometry to the query results.
Only works if `primary_location_id`
is included as a group_by field.
Returns
-------
Union[pd.DataFrame, gpd.GeoDataFrame, str]
A DataFrame or GeoDataFrame of query results
or the query itself as a string
Order By and Filter By Fields
-----------------------------------
* reference_time
* primary_location_id
* secondary_location_id
* primary_value
* secondary_value
* value_time
* configuration
* measurement_unit
* variable_name
* lead_time
* absolute_difference
* [any user-added fields]
"""

jtq = self._validate_query_model(jtq)

query = tqu_db.create_get_joined_timeseries_query(jtq)

if jtq.return_query:
return tqu.remove_empty_lines(query)
elif jtq.include_geometry:
self._check_if_geometry_is_inserted()
df = self.query(query, format="df")
return tqu.df_to_gdf(df)
else:
df = self.query(query, format="df")
return df

def get_timeseries(
self,
tq: TimeseriesQuery
Expand Down Expand Up @@ -434,9 +498,10 @@ def get_timeseries(
Order By Fields
---------------
* reference_time
* primary_location_id
* secondary_location_id
* location_id
* value
* primary_value
* secondary_value
* value_time
Expand Down Expand Up @@ -777,7 +842,7 @@ def insert_joined_timeseries(
"""
self._validate_joined_timeseries_base_fields(drop_added_fields)

jtq = JoinedTimeseriesQuery.model_validate(
jtq = InsertJoinedTimeseriesQuery.model_validate(
{
"primary_filepath": primary_filepath,
"secondary_filepath": secondary_filepath,
Expand Down Expand Up @@ -1086,6 +1151,73 @@ def get_metrics(
df = self.query(query, format="df")
return df

def get_joined_timeseries(
self,
order_by: List[str],
filters: Union[List[dict], None] = None,
include_geometry: bool = False,
return_query: bool = False,
) -> Union[pd.DataFrame, gpd.GeoDataFrame, str]:
"""Retrieve joined timeseries using database query.
Parameters
----------
order_by : List[str]
List of column/field names to order results by.
Must provide at least one.
filters : Union[List[dict], None] = None
List of dictionaries describing the "where" clause to limit data
that is included in metrics.
include_geometry : bool
True joins the geometry to the query results.
Only works if `primary_location_id`
is included as a group_by field.
return_query: bool = False
True returns the query string instead of the data
Returns
-------
Union[pd.DataFrame, gpd.GeoDataFrame str]
A DataFrame or GeoDataFrame of query results
or the query itself as a string
Order By and Filter By Fields
-----------------------------------
* reference_time
* primary_location_id
* secondary_location_id
* primary_value
* secondary_value
* value_time
* configuration
* measurement_unit
* variable_name
* lead_time
* absolute_difference
* [any user-added fields]
"""

data = {
"order_by": order_by,
"filters": filters,
"return_query": return_query,
"include_geometry": include_geometry,
}
jtq = self._validate_query_model(JoinedTimeseriesQuery, data)

query = tqu_db.create_get_joined_timeseries_query(jtq)

if jtq.return_query:
return tqu.remove_empty_lines(query)
elif jtq.include_geometry:
self._check_if_geometry_is_inserted()
df = self.query(query, format="df")
return tqu.df_to_gdf(df)
else:
df = self.query(query, format="df")
return df

def get_timeseries(
self,
order_by: List[str],
Expand Down Expand Up @@ -1127,6 +1259,19 @@ def get_timeseries(
* lead_time
* [any user-added fields]
Order By Fields
---------------
* primary_location_id
* secondary_location_id
* location_id
* value
* primary_value
* secondary_value
* value_time
* configuration
* measurement_unit
* variable_name
"""
data = {
"order_by": order_by,
Expand Down
58 changes: 55 additions & 3 deletions src/teehr/models/queries_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ def in_operator_must_have_iterable(
return v


class JoinedTimeseriesQuery(BaseModel):
class InsertJoinedTimeseriesQuery(BaseModel):
primary_filepath: Union[str, Path]
secondary_filepath: Union[str, Path]
crosswalk_filepath: Union[str, Path]
order_by: Optional[List[JoinedFieldNameEnum]] = []


class TimeseriesQuery(BaseModel):
class JoinedTimeseriesQuery(BaseModel):
order_by: List[str]
filters: Optional[List[Filter]] = []
return_query: Optional[bool] = False
timeseries_name: TimeseriesNameEnum
include_geometry: bool

@field_validator("filters")
def filter_must_be_list(cls, v):
Expand Down Expand Up @@ -180,6 +180,58 @@ def filters_must_exist_as_fields(cls, v, info: ValidationInfo):
return v


class TimeseriesQuery(BaseModel):
order_by: List[str]
filters: Optional[List[Filter]] = []
return_query: Optional[bool] = False
timeseries_name: TimeseriesNameEnum

@field_validator("filters")
def filter_must_be_list(cls, v):
if v is None:
return []
return v

@field_validator("order_by")
def order_by_must_exist_as_fields(cls, v, info: ValidationInfo):
"""order_by fields must be part one of the selected fields or
its alias"""
validation_fields = [
"location_id",
"reference_time",
"value",
"primary_value",
"secondary_value",
"value_time",
"primary_location_id",
"secondary_location_id",
"configuration",
"measurement_unit",
"variable_name"
]
for val in v:
if val not in validation_fields:
raise ValueError(
f"The order_by field '{val}' must be a timeseries"
f" field or its alias: {validation_fields}"
)
return v

@field_validator("filters")
def filters_must_exist_as_fields(cls, v, info: ValidationInfo):
"""filters fields must currently exist in the database"""
context = info.context
if context:
existing_fields = context.get("existing_fields", set())
for val in v:
if val.column not in existing_fields:
raise ValueError(
f"The filters field {val.column} does not"
"exist in the database"
)
return v


class TimeseriesCharQuery(BaseModel):
order_by: List[str]
group_by: List[str]
Expand Down
Loading

0 comments on commit afcd762

Please sign in to comment.