From 8340b5c7d0619103a3db5154e373db930307f635 Mon Sep 17 00:00:00 2001 From: samlamont Date: Fri, 22 Mar 2024 14:51:49 -0400 Subject: [PATCH] 137 incorrect chunking when loading nwm retrospective data across years (#138) * Refactoring chunking method for NWM retro and USGS loading * Version number and changelog entry * Added create periods based on chunk size function for consistent loading --------- Co-authored-by: Sam Lamont --- README.md | 4 +- docs/sphinx/changelog/index.rst | 13 +++ pyproject.toml | 2 +- src/teehr/loading/nwm/retrospective_grids.py | 107 +++++++++++------- src/teehr/loading/nwm/retrospective_points.py | 65 +++++------ src/teehr/loading/nwm/utils.py | 87 +++++++++++++- src/teehr/loading/usgs/usgs.py | 83 +++++--------- tests/test_nwm_loading_utils.py | 72 +++++++++++- tests/test_usgs_loading.py | 16 +-- version.txt | 2 +- 10 files changed, 304 insertions(+), 147 deletions(-) diff --git a/README.md b/README.md index 6ada3d45..0bcc1656 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,8 @@ $ poetry add git+https://github.com/RTIInternational/teehr.git#[BRANCH TAG] Use Docker ```bash -$ docker build -t teehr:v0.3.11 . -$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.11 jupyter lab --ip 0.0.0.0 $HOME +$ docker build -t teehr:v0.3.12 . +$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.12 jupyter lab --ip 0.0.0.0 $HOME ``` ## Examples diff --git a/docs/sphinx/changelog/index.rst b/docs/sphinx/changelog/index.rst index e1e9f24c..579fd718 100644 --- a/docs/sphinx/changelog/index.rst +++ b/docs/sphinx/changelog/index.rst @@ -1,6 +1,19 @@ Release Notes ============= +0.3.12 - 2024-03-21 +-------------------- + +Added +^^^^^ +* None + +Changed +^^^^^^^ + +* Changed the chunking method for USGS and NWM retrospective data loading to iterate over pandas ``period_range`` +rather than using ``groupby`` or ``date_range`` to fix a bug when fetching data over multiple years. + 0.3.11 - 2024-03-19 -------------------- diff --git a/pyproject.toml b/pyproject.toml index d5c3466b..9a57c854 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "teehr" -version = "0.3.11" +version = "0.3.12" description = "Tools for Exploratory Evaluation in Hydrologic Research" authors = [ "RTI International", diff --git a/src/teehr/loading/nwm/retrospective_grids.py b/src/teehr/loading/nwm/retrospective_grids.py index 6fbdea03..c886715f 100644 --- a/src/teehr/loading/nwm/retrospective_grids.py +++ b/src/teehr/loading/nwm/retrospective_grids.py @@ -1,5 +1,4 @@ """A module for loading retrospective NWM gridded data.""" -import time from datetime import datetime from pathlib import Path from typing import Union, Optional, Tuple, Dict @@ -18,7 +17,12 @@ SupportedNWMRetroDomainsEnum ) from teehr.models.loading.nwm22_grid import ForcingVariablesEnum -from teehr.loading.nwm.utils import write_parquet_file, get_dataset +from teehr.loading.nwm.utils import ( + write_parquet_file, + get_dataset, + get_period_start_end_times, + create_periods_based_on_chunksize +) from teehr.loading.nwm.retrospective_points import ( format_grouped_filename, validate_start_end_date, @@ -84,7 +88,7 @@ def construct_nwm21_json_paths( "s3://ciroh-nwm-zarr-retrospective-data-copy/" "noaa-nwm-retrospective-2-1-zarr-pds/forcing" ) - date_rng = pd.date_range(start_date, end_date, freq="H") + date_rng = pd.date_range(start_date, end_date, freq="h") dates = [] paths = [] for dt in date_rng: @@ -216,25 +220,30 @@ def nwm_retro_grids_to_parquet( # Construct Kerchunk-json paths within the selected time nwm21_paths = construct_nwm21_json_paths(start_date, end_date) - if chunk_by is None: - gps = [(None, nwm21_paths)] - - if chunk_by == "week": - gps = nwm21_paths.groupby( - pd.Grouper(key='datetime', axis=0, freq='W', sort=True) + if chunk_by in ["year", "location_id"]: + raise ValueError( + f"Chunkby '{chunk_by}' is not implemented for gridded data" ) - if chunk_by == "month": - gps = nwm21_paths.groupby( - pd.Grouper(key='datetime', axis=0, freq='M', sort=True) - ) + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) - if chunk_by == "year": - raise ValueError( - "Chunkby 'year' is not yet implemented for gridded data" - ) + for period in periods: + if period is not None: + dts = get_period_start_end_times( + period=period, + start_date=start_date, + end_date=end_date + ) + df = nwm21_paths[nwm21_paths.datetime.between( + dts["start_dt"], dts["end_dt"] + )].copy() + else: + df = nwm21_paths.copy() - for _, df in gps: # Process this chunk using dask delayed results = [] for row in df.itertuples(): @@ -256,8 +265,8 @@ def nwm_retro_grids_to_parquet( "configuration were found in GCS!") chunk_df = pd.concat(output) - start = df.datetime.min().strftime("%Y%m%dZ") - end = df.datetime.max().strftime("%Y%m%dZ") + start = df.datetime.min().strftime("%Y%m%d%HZ") + end = df.datetime.max().strftime("%Y%m%d%HZ") if start == end: output_filename = Path(output_parquet_dir, f"{start}.parquet") else: @@ -265,6 +274,7 @@ def nwm_retro_grids_to_parquet( output_parquet_dir, f"{start}_{end}.parquet" ) + write_parquet_file( filepath=output_filename, overwrite_output=overwrite_output, @@ -298,21 +308,29 @@ def nwm_retro_grids_to_parquet( cols = weights_df.col.values weight_vals = weights_df.weight.values - if chunk_by is None: - gps = [(None, var_da)] + if chunk_by in ["year", "location_id"]: + raise ValueError( + f"Chunkby '{chunk_by}' is not implemented for gridded data" + ) - if chunk_by == "week": - gps = var_da.groupby(var_da.time.dt.isocalendar().week) + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) - if chunk_by == "month": - gps = var_da.groupby("time.month") + for period in periods: - if chunk_by == "year": - raise ValueError( - "Chunkby 'year' is not yet implemented for gridded data" - ) + if period is not None: + dts = get_period_start_end_times( + period=period, + start_date=start_date, + end_date=end_date + ) + else: + dts = {"start_dt": start_date, "end_dt": end_date} - for _, da_i in gps: + da_i = var_da.sel(time=slice(dts["start_dt"], dts["end_dt"])) chunk_df = process_group( da_i=da_i, @@ -330,25 +348,26 @@ def nwm_retro_grids_to_parquet( output_parquet_dir, fname ) + write_parquet_file( filepath=output_filename, overwrite_output=overwrite_output, data=chunk_df) -if __name__ == "__main__": +# if __name__ == "__main__": - t0 = time.time() +# # t0 = time.time() - nwm_retro_grids_to_parquet( - nwm_version="nwm30", - variable_name="RAINRATE", - zonal_weights_filepath="/mnt/data/ciroh/wbdhuc10_weights.parquet", - start_date="2008-05-22 00:00", - end_date="2008-05-22 23:00", - output_parquet_dir="/mnt/data/ciroh/retro", - overwrite_output=True, - chunk_by=None - ) +# nwm_retro_grids_to_parquet( +# nwm_version="nwm21", +# variable_name="RAINRATE", +# zonal_weights_filepath="/mnt/data/merit/YalansBasins/cat_pfaf_7_conus_subset_nwm_v30_weights.parquet", +# start_date="2007-09-01 00:00", +# end_date="2008-3-22 23:00", +# output_parquet_dir="/mnt/data/ciroh/retro", +# overwrite_output=True, +# chunk_by="week" +# ) - print(f"Total elapsed: {(time.time() - t0):.2f} secs") +# # print(f"Total elapsed: {(time.time() - t0):.2f} secs") diff --git a/src/teehr/loading/nwm/retrospective_points.py b/src/teehr/loading/nwm/retrospective_points.py index 898e66a6..865b5c11 100644 --- a/src/teehr/loading/nwm/retrospective_points.py +++ b/src/teehr/loading/nwm/retrospective_points.py @@ -15,7 +15,11 @@ SupportedNWMRetroVersionsEnum, SupportedNWMRetroDomainsEnum ) -from teehr.loading.nwm.utils import write_parquet_file +from teehr.loading.nwm.utils import ( + write_parquet_file, + get_period_start_end_times, + create_periods_based_on_chunksize +) NWM20_MIN_DATE = datetime(1993, 1, 1) NWM20_MAX_DATE = datetime(2018, 12, 31, 23) @@ -227,28 +231,17 @@ def nwm_retro_to_parquet( if not output_dir.exists(): output_dir.mkdir(parents=True) - ds = xr.open_zarr( + da = xr.open_zarr( fsspec.get_mapper(s3_zarr_url, anon=True), consolidated=True - ).sel(feature_id=location_ids, time=slice(start_date, end_date)) - - # Fetch all at once - if chunk_by is None: - - da = ds[variable_name] - df = da_to_df(nwm_version, da) - min_time = df.value_time.min().strftime("%Y%m%d%HZ") - max_time = df.value_time.max().strftime("%Y%m%d%HZ") - output_filepath = Path( - output_parquet_dir, f"{min_time}_{max_time}.parquet" - ) - write_parquet_file(output_filepath, overwrite_output, df) - return + )[variable_name].sel( + feature_id=location_ids, time=slice(start_date, end_date) + ) # Fetch data by site if chunk_by == "location_id": for location_id in location_ids: - da = ds[variable_name].sel(feature_id=location_id) + da = da.sel(feature_id=location_id) df = da_to_df(nwm_version, da) min_time = df.value_time.min().strftime("%Y%m%d%HZ") max_time = df.value_time.max().strftime("%Y%m%d%HZ") @@ -259,26 +252,28 @@ def nwm_retro_to_parquet( write_parquet_file(output_filepath, overwrite_output, df) return - # Group dataset by day, week, month, or year - if chunk_by == "day": - gps = ds.groupby("time.day") + # Chunk data by time + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) - if chunk_by == "week": - # Calendar week: Monday to Sunday - gps = ds.groupby(ds.time.dt.isocalendar().week) + for period in periods: - if chunk_by == "month": - # Calendar month - gps = ds.groupby("time.month") + if period is not None: + dts = get_period_start_end_times( + period=period, + start_date=start_date, + end_date=end_date + ) + else: + dts = {"start_dt": start_date, "end_dt": end_date} - if chunk_by == "year": - # Calendar year - gps = ds.groupby("time.year") + da_i = da.sel(time=slice(dts["start_dt"], dts["end_dt"])) - # Process the data by selected chunk - for _, ds_i in gps: - df = da_to_df(nwm_version, ds_i[variable_name]) - output_filename = format_grouped_filename(ds_i) + df = da_to_df(nwm_version, da_i) + output_filename = format_grouped_filename(da_i) output_filepath = Path( output_parquet_dir, output_filename ) @@ -303,10 +298,10 @@ def nwm_retro_to_parquet( # nwm_version="nwm20", # variable_name="streamflow", # start_date=datetime(2000, 1, 1), -# end_date=datetime(2000, 1, 2), +# end_date=datetime(2000, 10, 2), # location_ids=LOCATION_IDS, # output_parquet_dir=Path(Path().home(), "temp", "nwm20_retrospective"), -# chunk_by="day", +# chunk_by="month", # ) # nwm_retro_to_parquet( diff --git a/src/teehr/loading/nwm/utils.py b/src/teehr/loading/nwm/utils.py index 5d86eef7..cb3c1225 100644 --- a/src/teehr/loading/nwm/utils.py +++ b/src/teehr/loading/nwm/utils.py @@ -21,7 +21,8 @@ SupportedKerchunkMethod ) from teehr.models.loading.utils import ( - SupportedNWMOperationalVersionsEnum + SupportedNWMOperationalVersionsEnum, + ChunkByEnum ) from teehr.loading.nwm.const import ( NWM_BUCKET, @@ -309,8 +310,8 @@ def gen_json( raise Exception(f"Corrupt file: {remote_path}") from err else: logger.warning( - ("A potentially corrupt file was encountered:") - (f"{remote_path}") + "A potentially corrupt file was encountered:" + f"{remote_path}" ) return None with open(outf, "wb") as f: @@ -573,3 +574,83 @@ def build_remote_nwm_filelist( component_paths = sorted([f"gcs://{path}" for path in component_paths]) return component_paths + + +def get_period_start_end_times( + period: pd.Period, + start_date: datetime, + end_date: datetime +) -> Dict[str, datetime]: + """Get the start and end times for a period, adjusting for the + start and end dates of the data ingest. + + Parameters + ---------- + period : pd.Period + The current period. + start_date : datetime + The start date of the data ingest. + end_date : datetime + Then end date of the data ingest. + + Returns + ------- + Dict[str, datetime] + The start and end times for the period. + """ + + start_dt = period.start_time + end_dt = period.end_time + + if start_date > period.start_time: + start_dt = start_date + + if (end_date < period.end_time) & (period.freq.name != "D"): + end_dt = end_date + + return {"start_dt": start_dt, "end_dt": end_dt} + + +def create_periods_based_on_chunksize( + start_date: datetime, + end_date: datetime, + chunk_by: Union[ChunkByEnum, None] +) -> List[pd.Period]: + """Create a list of periods based on the specified start and end dates + and the chunk size. + + Parameters + ---------- + start_date : datetime + The start date. + end_date : datetime + The end date. + chunk_by : Union[ChunkByEnum, None] + The chunk size frequency. + + Returns + ------- + List[pd.Period] + A pandas period range. + """ + if chunk_by is None: + periods = [None] + + if chunk_by == "day": + periods = pd.period_range(start=start_date, end=end_date, freq="D") + + if chunk_by == "week": + periods = pd.period_range(start=start_date, end=end_date, freq="W") + + if chunk_by == "month": + periods = pd.period_range(start=start_date, end=end_date, freq="M") + + if chunk_by == "year": + periods = pd.period_range(start=start_date, end=end_date, freq="Y") + + if chunk_by == "location_id": + raise ValueError( + "A period range cannot be created based on location_id." + ) + + return periods diff --git a/src/teehr/loading/usgs/usgs.py b/src/teehr/loading/usgs/usgs.py index c80a7cfa..da79d0a4 100644 --- a/src/teehr/loading/usgs/usgs.py +++ b/src/teehr/loading/usgs/usgs.py @@ -7,9 +7,14 @@ from hydrotools.nwis_client.iv import IVDataService from teehr.models.loading.utils import ChunkByEnum from pydantic import validate_call, ConfigDict -from teehr.loading.nwm.utils import write_parquet_file +from teehr.loading.nwm.utils import ( + write_parquet_file, + get_period_start_end_times, + create_periods_based_on_chunksize +) DATETIME_STR_FMT = "%Y-%m-%dT%H:%M:00+0000" +DAYLIGHT_SAVINGS_PAD = timedelta(hours=2) def _filter_to_hourly(df: pd.DataFrame) -> pd.DataFrame: @@ -196,35 +201,20 @@ def usgs_to_parquet( if not output_dir.exists(): output_dir.mkdir(parents=True) - # Fetch all at once - if chunk_by is None: - usgs_df = _fetch_usgs( - sites=sites, - start_date=start_date, - end_date=end_date, - filter_to_hourly=filter_to_hourly, - filter_no_data=filter_no_data, - convert_to_si=convert_to_si - ) - if len(usgs_df) > 0: - output_filepath = Path(output_parquet_dir, "usgs.parquet") - write_parquet_file( - filepath=output_filepath, - overwrite_output=overwrite_output, - data=usgs_df - ) - return - if chunk_by == "location_id": for site in sites: usgs_df = _fetch_usgs( sites=[site], - start_date=start_date, - end_date=end_date, + start_date=start_date - DAYLIGHT_SAVINGS_PAD, + end_date=end_date + DAYLIGHT_SAVINGS_PAD, filter_to_hourly=filter_to_hourly, filter_no_data=filter_no_data, convert_to_si=convert_to_si ) + + usgs_df = usgs_df[(usgs_df["value_time"] >= start_date) & + (usgs_df["value_time"] < end_date)] + if len(usgs_df) > 0: output_filepath = Path( output_parquet_dir, @@ -237,51 +227,40 @@ def usgs_to_parquet( ) return - # TODO: Print warning if chunk_by is bigger than start and end dates? - - if chunk_by == "day": - date_intervals = pd.date_range(start_date, end_date, freq="D") - - if chunk_by == "week": - date_intervals = pd.date_range(start_date, end_date, freq="W") - - if chunk_by == "month": - date_intervals = pd.date_range(start_date, end_date, freq="M") - - if chunk_by == "year": - date_intervals = pd.date_range(start_date, end_date, freq="Y") - - # If the start date is not within the specified interval, - # it is not included, so add it here if it does not already exist. - date_intervals = date_intervals.union( - pd.DatetimeIndex([start_date]), sort=True + # Chunk data by time + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by ) - for i, dt_intvl in enumerate(date_intervals): - if i == 0: - start_dt = start_date - else: - start_dt = dt_intvl + for period in periods: - if i == len(date_intervals) - 1: - # Include data for the last day - end_dt = end_date + timedelta(hours=24) - timedelta(minutes=1) + if period is not None: + dts = get_period_start_end_times( + period=period, + start_date=start_date, + end_date=end_date + ) else: - end_dt = date_intervals[i + 1] - timedelta(minutes=1) + dts = {"start_dt": start_date, "end_dt": end_date} usgs_df = _fetch_usgs( sites=sites, - start_date=start_dt, - end_date=end_dt, + start_date=dts["start_dt"] - DAYLIGHT_SAVINGS_PAD, + end_date=dts["end_dt"] + DAYLIGHT_SAVINGS_PAD, filter_to_hourly=filter_to_hourly, filter_no_data=filter_no_data, convert_to_si=convert_to_si ) + usgs_df = usgs_df[(usgs_df["value_time"] >= dts["start_dt"]) & + (usgs_df["value_time"] <= dts["end_dt"])] + if len(usgs_df) > 0: output_filename = _format_output_filename( - chunk_by, start_dt, end_dt + chunk_by, dts["start_dt"], dts["end_dt"] ) output_filepath = Path(output_parquet_dir, output_filename) diff --git a/tests/test_nwm_loading_utils.py b/tests/test_nwm_loading_utils.py index 2be912ad..d0349544 100644 --- a/tests/test_nwm_loading_utils.py +++ b/tests/test_nwm_loading_utils.py @@ -8,13 +8,16 @@ check_dates_against_nwm_version, build_remote_nwm_filelist, generate_json_paths, - get_dataset + get_dataset, + create_periods_based_on_chunksize ) from teehr.loading.nwm.const import ( NWM22_ANALYSIS_CONFIG, NWM30_ANALYSIS_CONFIG, ) +TIMEFORMAT = "%Y-%m-%d %H:%M:%S" + TEST_DIR = Path("tests", "data", "nwm30") TEMP_DIR = Path("tests", "data", "temp") @@ -151,6 +154,69 @@ def test_generate_json_for_bad_file(): ) +def test_create_periods_based_on_day(): + """Test creating periods based on daily chunksize.""" + start_date = "2023-12-30" + end_date = "2024-01-02" + chunk_by = "day" + + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) + assert periods[0].start_time.strftime(TIMEFORMAT) == "2023-12-30 00:00:00" + assert periods[0].end_time.strftime(TIMEFORMAT) == "2023-12-30 23:59:59" + assert periods[1].start_time.strftime(TIMEFORMAT) == "2023-12-31 00:00:00" + assert periods[1].end_time.strftime(TIMEFORMAT) == "2023-12-31 23:59:59" + assert periods[2].start_time.strftime(TIMEFORMAT) == "2024-01-01 00:00:00" + assert periods[2].end_time.strftime(TIMEFORMAT) == "2024-01-01 23:59:59" + assert periods[3].start_time.strftime(TIMEFORMAT) == "2024-01-02 00:00:00" + assert periods[3].end_time.strftime(TIMEFORMAT) == "2024-01-02 23:59:59" + + +def test_create_periods_based_on_week(): + """Test creating periods based on weekly chunksize.""" + start_date = "2023-12-30" + end_date = "2024-01-02" + chunk_by = "week" + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) + assert periods[0].start_time.strftime(TIMEFORMAT) == "2023-12-25 00:00:00" + assert periods[0].end_time.strftime(TIMEFORMAT) == "2023-12-31 23:59:59" + + +def test_create_periods_based_on_month(): + """Test creating periods based on monthly chunksize.""" + start_date = "2023-12-30" + end_date = "2024-01-02" + chunk_by = "month" + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) + assert periods[0].start_time.strftime(TIMEFORMAT) == "2023-12-01 00:00:00" + assert periods[0].end_time.strftime(TIMEFORMAT) == "2023-12-31 23:59:59" + + +def test_create_periods_based_on_year(): + """Test creating periods based on yearly chunksize.""" + start_date = "2023-12-30" + end_date = "2024-01-02" + chunk_by = "year" + periods = create_periods_based_on_chunksize( + start_date=start_date, + end_date=end_date, + chunk_by=chunk_by + ) + assert periods[0].start_time.strftime(TIMEFORMAT) == "2023-01-01 00:00:00" + assert periods[0].end_time.strftime(TIMEFORMAT) == "2023-12-31 23:59:59" + + if __name__ == "__main__": test_dates_and_nwm_version() test_building_nwm30_gcs_paths() @@ -158,3 +224,7 @@ def test_generate_json_for_bad_file(): test_generate_json_paths() test_point_zarr_reference_file() test_generate_json_for_bad_file() + test_create_periods_based_on_day() + test_create_periods_based_on_week() + test_create_periods_based_on_month() + test_create_periods_based_on_year() diff --git a/tests/test_usgs_loading.py b/tests/test_usgs_loading.py index 22db8292..b8518336 100644 --- a/tests/test_usgs_loading.py +++ b/tests/test_usgs_loading.py @@ -72,10 +72,10 @@ def test_chunkby_week(): chunk_by="week", overwrite_output=True ) - df = pd.read_parquet(Path(TEMP_DIR, "2023-02-20_2023-02-25.parquet")) - assert len(df) == 287 - df = pd.read_parquet(Path(TEMP_DIR, "2023-02-26_2023-03-03.parquet")) - assert len(df) == 266 + df = pd.read_parquet(Path(TEMP_DIR, "2023-02-20_2023-02-26.parquet")) + assert len(df) == 335 + df = pd.read_parquet(Path(TEMP_DIR, "2023-02-27_2023-03-03.parquet")) + assert len(df) == 186 def test_chunkby_month(): @@ -91,10 +91,10 @@ def test_chunkby_month(): chunk_by="month", overwrite_output=True ) - df = pd.read_parquet(Path(TEMP_DIR, "2023-02-20_2023-02-27.parquet")) - assert len(df) == 383 - df = pd.read_parquet(Path(TEMP_DIR, "2023-02-28_2023-03-25.parquet")) - assert len(df) == 1188 # missing final hour 23 + df = pd.read_parquet(Path(TEMP_DIR, "2023-02-20_2023-02-28.parquet")) + assert len(df) == 431 + df = pd.read_parquet(Path(TEMP_DIR, "2023-03-01_2023-03-25.parquet")) + assert len(df) == 1111 def test_chunkby_all(): diff --git a/version.txt b/version.txt index 5503126d..0b9c0199 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.3.10 +0.3.12