Skip to content

Commit

Permalink
137 incorrect chunking when loading nwm retrospective data across yea…
Browse files Browse the repository at this point in the history
…rs (#138)

* Refactoring chunking method for NWM retro and USGS loading

* Version number and changelog entry

* Added create periods based on chunk size function for consistent loading

---------

Co-authored-by: Sam Lamont <[email protected]>
  • Loading branch information
samlamont and Sam Lamont authored Mar 22, 2024
1 parent 960ff96 commit 8340b5c
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 147 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ $ poetry add git+https://github.com/RTIInternational/teehr.git#[BRANCH TAG]

Use Docker
```bash
$ docker build -t teehr:v0.3.11 .
$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.11 jupyter lab --ip 0.0.0.0 $HOME
$ docker build -t teehr:v0.3.12 .
$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.12 jupyter lab --ip 0.0.0.0 $HOME
```

## Examples
Expand Down
13 changes: 13 additions & 0 deletions docs/sphinx/changelog/index.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
Release Notes
=============

0.3.12 - 2024-03-21
--------------------

Added
^^^^^
* None

Changed
^^^^^^^

* Changed the chunking method for USGS and NWM retrospective data loading to iterate over pandas ``period_range``
rather than using ``groupby`` or ``date_range`` to fix a bug when fetching data over multiple years.

0.3.11 - 2024-03-19
--------------------

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "teehr"
version = "0.3.11"
version = "0.3.12"
description = "Tools for Exploratory Evaluation in Hydrologic Research"
authors = [
"RTI International",
Expand Down
107 changes: 63 additions & 44 deletions src/teehr/loading/nwm/retrospective_grids.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""A module for loading retrospective NWM gridded data."""
import time
from datetime import datetime
from pathlib import Path
from typing import Union, Optional, Tuple, Dict
Expand All @@ -18,7 +17,12 @@
SupportedNWMRetroDomainsEnum
)
from teehr.models.loading.nwm22_grid import ForcingVariablesEnum
from teehr.loading.nwm.utils import write_parquet_file, get_dataset
from teehr.loading.nwm.utils import (
write_parquet_file,
get_dataset,
get_period_start_end_times,
create_periods_based_on_chunksize
)
from teehr.loading.nwm.retrospective_points import (
format_grouped_filename,
validate_start_end_date,
Expand Down Expand Up @@ -84,7 +88,7 @@ def construct_nwm21_json_paths(
"s3://ciroh-nwm-zarr-retrospective-data-copy/"
"noaa-nwm-retrospective-2-1-zarr-pds/forcing"
)
date_rng = pd.date_range(start_date, end_date, freq="H")
date_rng = pd.date_range(start_date, end_date, freq="h")
dates = []
paths = []
for dt in date_rng:
Expand Down Expand Up @@ -216,25 +220,30 @@ def nwm_retro_grids_to_parquet(
# Construct Kerchunk-json paths within the selected time
nwm21_paths = construct_nwm21_json_paths(start_date, end_date)

if chunk_by is None:
gps = [(None, nwm21_paths)]

if chunk_by == "week":
gps = nwm21_paths.groupby(
pd.Grouper(key='datetime', axis=0, freq='W', sort=True)
if chunk_by in ["year", "location_id"]:
raise ValueError(
f"Chunkby '{chunk_by}' is not implemented for gridded data"
)

if chunk_by == "month":
gps = nwm21_paths.groupby(
pd.Grouper(key='datetime', axis=0, freq='M', sort=True)
)
periods = create_periods_based_on_chunksize(
start_date=start_date,
end_date=end_date,
chunk_by=chunk_by
)

if chunk_by == "year":
raise ValueError(
"Chunkby 'year' is not yet implemented for gridded data"
)
for period in periods:
if period is not None:
dts = get_period_start_end_times(
period=period,
start_date=start_date,
end_date=end_date
)
df = nwm21_paths[nwm21_paths.datetime.between(
dts["start_dt"], dts["end_dt"]
)].copy()
else:
df = nwm21_paths.copy()

for _, df in gps:
# Process this chunk using dask delayed
results = []
for row in df.itertuples():
Expand All @@ -256,15 +265,16 @@ def nwm_retro_grids_to_parquet(
"configuration were found in GCS!")
chunk_df = pd.concat(output)

start = df.datetime.min().strftime("%Y%m%dZ")
end = df.datetime.max().strftime("%Y%m%dZ")
start = df.datetime.min().strftime("%Y%m%d%HZ")
end = df.datetime.max().strftime("%Y%m%d%HZ")
if start == end:
output_filename = Path(output_parquet_dir, f"{start}.parquet")
else:
output_filename = Path(
output_parquet_dir,
f"{start}_{end}.parquet"
)

write_parquet_file(
filepath=output_filename,
overwrite_output=overwrite_output,
Expand Down Expand Up @@ -298,21 +308,29 @@ def nwm_retro_grids_to_parquet(
cols = weights_df.col.values
weight_vals = weights_df.weight.values

if chunk_by is None:
gps = [(None, var_da)]
if chunk_by in ["year", "location_id"]:
raise ValueError(
f"Chunkby '{chunk_by}' is not implemented for gridded data"
)

if chunk_by == "week":
gps = var_da.groupby(var_da.time.dt.isocalendar().week)
periods = create_periods_based_on_chunksize(
start_date=start_date,
end_date=end_date,
chunk_by=chunk_by
)

if chunk_by == "month":
gps = var_da.groupby("time.month")
for period in periods:

if chunk_by == "year":
raise ValueError(
"Chunkby 'year' is not yet implemented for gridded data"
)
if period is not None:
dts = get_period_start_end_times(
period=period,
start_date=start_date,
end_date=end_date
)
else:
dts = {"start_dt": start_date, "end_dt": end_date}

for _, da_i in gps:
da_i = var_da.sel(time=slice(dts["start_dt"], dts["end_dt"]))

chunk_df = process_group(
da_i=da_i,
Expand All @@ -330,25 +348,26 @@ def nwm_retro_grids_to_parquet(
output_parquet_dir,
fname
)

write_parquet_file(
filepath=output_filename,
overwrite_output=overwrite_output,
data=chunk_df)


if __name__ == "__main__":
# if __name__ == "__main__":

t0 = time.time()
# # t0 = time.time()

nwm_retro_grids_to_parquet(
nwm_version="nwm30",
variable_name="RAINRATE",
zonal_weights_filepath="/mnt/data/ciroh/wbdhuc10_weights.parquet",
start_date="2008-05-22 00:00",
end_date="2008-05-22 23:00",
output_parquet_dir="/mnt/data/ciroh/retro",
overwrite_output=True,
chunk_by=None
)
# nwm_retro_grids_to_parquet(
# nwm_version="nwm21",
# variable_name="RAINRATE",
# zonal_weights_filepath="/mnt/data/merit/YalansBasins/cat_pfaf_7_conus_subset_nwm_v30_weights.parquet",
# start_date="2007-09-01 00:00",
# end_date="2008-3-22 23:00",
# output_parquet_dir="/mnt/data/ciroh/retro",
# overwrite_output=True,
# chunk_by="week"
# )

print(f"Total elapsed: {(time.time() - t0):.2f} secs")
# # print(f"Total elapsed: {(time.time() - t0):.2f} secs")
65 changes: 30 additions & 35 deletions src/teehr/loading/nwm/retrospective_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
SupportedNWMRetroVersionsEnum,
SupportedNWMRetroDomainsEnum
)
from teehr.loading.nwm.utils import write_parquet_file
from teehr.loading.nwm.utils import (
write_parquet_file,
get_period_start_end_times,
create_periods_based_on_chunksize
)

NWM20_MIN_DATE = datetime(1993, 1, 1)
NWM20_MAX_DATE = datetime(2018, 12, 31, 23)
Expand Down Expand Up @@ -227,28 +231,17 @@ def nwm_retro_to_parquet(
if not output_dir.exists():
output_dir.mkdir(parents=True)

ds = xr.open_zarr(
da = xr.open_zarr(
fsspec.get_mapper(s3_zarr_url, anon=True), consolidated=True
).sel(feature_id=location_ids, time=slice(start_date, end_date))

# Fetch all at once
if chunk_by is None:

da = ds[variable_name]
df = da_to_df(nwm_version, da)
min_time = df.value_time.min().strftime("%Y%m%d%HZ")
max_time = df.value_time.max().strftime("%Y%m%d%HZ")
output_filepath = Path(
output_parquet_dir, f"{min_time}_{max_time}.parquet"
)
write_parquet_file(output_filepath, overwrite_output, df)
return
)[variable_name].sel(
feature_id=location_ids, time=slice(start_date, end_date)
)

# Fetch data by site
if chunk_by == "location_id":
for location_id in location_ids:

da = ds[variable_name].sel(feature_id=location_id)
da = da.sel(feature_id=location_id)
df = da_to_df(nwm_version, da)
min_time = df.value_time.min().strftime("%Y%m%d%HZ")
max_time = df.value_time.max().strftime("%Y%m%d%HZ")
Expand All @@ -259,26 +252,28 @@ def nwm_retro_to_parquet(
write_parquet_file(output_filepath, overwrite_output, df)
return

# Group dataset by day, week, month, or year
if chunk_by == "day":
gps = ds.groupby("time.day")
# Chunk data by time
periods = create_periods_based_on_chunksize(
start_date=start_date,
end_date=end_date,
chunk_by=chunk_by
)

if chunk_by == "week":
# Calendar week: Monday to Sunday
gps = ds.groupby(ds.time.dt.isocalendar().week)
for period in periods:

if chunk_by == "month":
# Calendar month
gps = ds.groupby("time.month")
if period is not None:
dts = get_period_start_end_times(
period=period,
start_date=start_date,
end_date=end_date
)
else:
dts = {"start_dt": start_date, "end_dt": end_date}

if chunk_by == "year":
# Calendar year
gps = ds.groupby("time.year")
da_i = da.sel(time=slice(dts["start_dt"], dts["end_dt"]))

# Process the data by selected chunk
for _, ds_i in gps:
df = da_to_df(nwm_version, ds_i[variable_name])
output_filename = format_grouped_filename(ds_i)
df = da_to_df(nwm_version, da_i)
output_filename = format_grouped_filename(da_i)
output_filepath = Path(
output_parquet_dir, output_filename
)
Expand All @@ -303,10 +298,10 @@ def nwm_retro_to_parquet(
# nwm_version="nwm20",
# variable_name="streamflow",
# start_date=datetime(2000, 1, 1),
# end_date=datetime(2000, 1, 2),
# end_date=datetime(2000, 10, 2),
# location_ids=LOCATION_IDS,
# output_parquet_dir=Path(Path().home(), "temp", "nwm20_retrospective"),
# chunk_by="day",
# chunk_by="month",
# )

# nwm_retro_to_parquet(
Expand Down
Loading

0 comments on commit 8340b5c

Please sign in to comment.