diff --git a/src/bmi_troute.py b/src/bmi_troute.py new file mode 100644 index 000000000..5322f6fc8 --- /dev/null +++ b/src/bmi_troute.py @@ -0,0 +1,630 @@ +"""Basic Model Interface implementation for t-route.""" + +import numpy as np +import pandas as pd +from bmipy import Bmi +from pathlib import Path +import yaml +from datetime import datetime, timedelta + +import troute.main_utilities as tr +import troute.nhd_network_utilities_v02 as nnu + + + +class bmi_troute(Bmi): + + def __init__(self): + """Create a Bmi troute model that is ready for initialization.""" + super(bmi_troute, self).__init__() + #self._model = None + self._values = {} + #self._var_units = {} + self._var_loc = "node" + self._var_grid_id = 0 + #self._grids = {} + #self._grid_type = {} + + self._start_time = 0.0 + self._end_time = np.finfo("d").max + self._time_units = "s" + + #---------------------------------------------- + # Required, static attributes of the model + #---------------------------------------------- + _att_map = { + 'model_name': 'T-Route for Next Generation NWM', + 'version': '', + 'author_name': '', + 'grid_type': 'scalar', + 'time_step_size': 1, + #'time_step_type': 'donno', #unused + #'step_method': 'none', #unused + #'time_units': '1 hour' #NJF Have to drop the 1 for NGEN to recognize the unit + 'time_units': 'seconds' } + + #--------------------------------------------- + # Input variable names (CSDMS standard names) + #--------------------------------------------- + _input_var_names = ['land_surface_water_source__volume_flow_rate', + 'coastal_boundary__depth', #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + 'usgs_gage_observation__volume_flow_rate', #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + 'usace_gage_observation__volume_flow_rate', #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + 'rfc_gage_observation__volume_flow_rate' #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + ] + + #--------------------------------------------- + # Output variable names (CSDMS standard names) + #--------------------------------------------- + _output_var_names = ['channel_exit_water_x-section__volume_flow_rate', + 'channel_water_flow__speed', + 'channel_water__mean_depth', + 'lake_water~incoming__volume_flow_rate', + 'lake_water~outgoing__volume_flow_rate', + 'lake_surface__elevation' #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + ] + + #------------------------------------------------------ + # Create a Python dictionary that maps CSDMS Standard + # Names to the model's internal variable names. + #------------------------------------------------------ + _var_name_units_map = { + 'channel_exit_water_x-section__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'channel_water_flow__speed':['streamflow_ms','m s-1'], + 'channel_water__mean_depth':['streamflow_m','m'], + 'lake_water~incoming__volume_flow_rate':['waterbody_cms','m3 s-1'], + 'lake_water~outgoing__volume_flow_rate':['waterbody_cms','m3 s-1'], + 'lake_surface__elevation':['waterbody_m','m'], + #-------------- Dynamic inputs -------------------------------- + 'land_surface_water_source__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'coastal_boundary__depth':['depth_m', 'm'], + 'usgs_gage_observation__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'usace_gage_observation__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'rfc_gage_observation__volume_flow_rate':['streamflow_cms','m3 s-1'] + } + + #------------------------------------------------------ + # A list of static attributes. Not all these need to be used. + #------------------------------------------------------ + _static_attributes_list = [] + + + #------------------------------------------------------------ + #------------------------------------------------------------ + # BMI: Model Control Functions + #------------------------------------------------------------ + #------------------------------------------------------------ + + #------------------------------------------------------------------- + def initialize(self, bmi_cfg_file=None): + + args = tr._handle_args_v03(['-f', bmi_cfg_file]) + + # -------------- Read in the BMI configuration -------------------------# + bmi_cfg_file = Path(bmi_cfg_file) + # ----- Create some lookup tabels from the long variable names --------# + self._var_name_map_long_first = {long_name:self._var_name_units_map[long_name][0] for \ + long_name in self._var_name_units_map.keys()} + self._var_name_map_short_first = {self._var_name_units_map[long_name][0]:long_name for \ + long_name in self._var_name_units_map.keys()} + self._var_units_map = {long_name:self._var_name_units_map[long_name][1] for \ + long_name in self._var_name_units_map.keys()} + + # This will direct all the next moves. + if bmi_cfg_file is not None: + + with bmi_cfg_file.open('r') as fp: + cfg = yaml.safe_load(fp) + self._cfg_bmi = self._parse_config(cfg) + else: + print("Error: No configuration provided, nothing to do...") + + # ------------- Initialize t-route model ------------------------------# + (self._network, + self._log_parameters, + self._preprocessing_parameters, + self._supernetwork_parameters, + self._waterbody_parameters, + self._compute_parameters, + self._forcing_parameters, + self._restart_parameters, + self._hybrid_parameters, + self._output_parameters, + self._parity_parameters, + self._data_assimilation_parameters, + self._run_parameters, + ) = tr.initialize_network(args) + + # Set number of time steps (1 hour) + self._nts = 12 + + # -------------- Initalize all the variables --------------------------# + # -------------- so that they'll be picked up with the get functions --# + for var_name in list(self._var_name_units_map.keys()): + # ---------- All the variables are single values ------------------# + # ---------- so just set to zero for now. ------------------# + self._values[var_name] = np.zeros(self._network.dataframe.shape[0]) + setattr( self, var_name, 0 ) + + ''' + # -------------- Update dimensions of DA variables --------------------# + #################################### + # Maximum lookback hours from reservoir configurations + usgs_shape = self._network._waterbody_types_df[self._network._waterbody_types_df['reservoir_type']==2].shape[0] + usace_shape = self._network._waterbody_types_df[self._network._waterbody_types_df['reservoir_type']==3].shape[0] + rfc_shape = self._network._waterbody_types_df[self._network._waterbody_types_df['reservoir_type']==4].shape[0] + + max_lookback_hrs = max(self._data_assimilation_parameters.get('timeslice_lookback_hours'), + self._waterbody_parameters.get('rfc').get('reservoir_rfc_forecasts_lookback_hours')) + + self._values['usgs_gage_observation__volume_flow_rate'] = np.zeros((usgs_shape,max_lookback_hrs*4)) + setattr( self, 'usgs_gage_observation__volume_flow_rate', 0 ) + self._values['usace_gage_observation__volume_flow_rate'] = np.zeros((usace_shape,max_lookback_hrs*4)) + setattr( self, 'usace_gage_observation__volume_flow_rate', 0 ) + self._values['rfc_gage_observation__volume_flow_rate'] = np.zeros((rfc_shape,max_lookback_hrs*4)) + setattr( self, 'rfc_gage_observation__volume_flow_rate', 0 ) + ''' + + self._start_time = 0.0 + self._end_time = self._forcing_parameters.get('dt') * self._forcing_parameters.get('nts') + self._time = 0.0 + self._time_step = self._forcing_parameters.get('dt') + self._time_units = 's' + + def update(self): + """Advance model by one time step.""" + + # Set input data into t-route objects + self._network._qlateral = pd.DataFrame(self._values['land_surface_water_source__volume_flow_rate'], + index=self._network.dataframe.index.to_numpy()) + self._network._coastal_boundary_depth_df = pd.DataFrame(self._values['coastal_boundary__depth']) + + ''' + ( + self._run_sets, + self._da_sets, + self._parity_sets + ) = tr.build_run_sets(self._network, + self._forcing_parameters, + self._compute_parameters, + self._data_assimilation_parameters, + self._output_parameters, + self._parity_parameters) + + # Create forcing data within network object for first loop iteration + self._network.assemble_forcings( + self._run_sets[0], + self._forcing_parameters, + self._hybrid_parameters, + self._compute_parameters.get('cpu_pool', None)) + ''' + # Create data assimilation object from da_sets for first loop iteration + ###NOTE: this is just a place holder, setting DA variables will be done with set_values... + self._data_assimilation = tr.build_data_assimilation( + self._network, + self._data_assimilation_parameters, + self._waterbody_parameters, + [], #self._da_sets[0], + self._forcing_parameters, + self._compute_parameters) + + ''' + self._run_sets[0]['t0'] = self._network.t0 + self._run_sets[0]['dt'] = self._forcing_parameters.get('dt') + ''' + + # Run routing + ''' + self._run_results = tr.run_routing( + self._network, + self._data_assimilation, + [self._run_sets[0]], + self._da_sets, + self._compute_parameters, + self._forcing_parameters, + self._waterbody_parameters, + self._output_parameters, + self._hybrid_parameters, + self._data_assimilation_parameters, + self._run_parameters, + self._parity_sets) + ''' + + ( + self._run_results, + self._subnetwork_list + ) = tr.nwm_route(self._network.connections, + self._network.reverse_network, + self._network.waterbody_connections, + self._network._reaches_by_tw, + self._compute_parameters.get('parallel_compute_method','serial'), + self._compute_parameters.get('compute_kernel'), + self._compute_parameters.get('subnetwork_target_size'), + self._compute_parameters.get('cpu_pool'), + self._network.t0, + self._time_step, + self._nts, + self._forcing_parameters.get('qts_subdivisions', 12), + self._network.independent_networks, + self._network.dataframe, + self._network.q0, + self._network._qlateral, + self._data_assimilation.usgs_df, + self._data_assimilation.lastobs_df, + self._data_assimilation.reservoir_usgs_df, + self._data_assimilation.reservoir_usgs_param_df, + self._data_assimilation.reservoir_usace_df, + self._data_assimilation.reservoir_usace_param_df, + self._data_assimilation.assimilation_parameters, + self._compute_parameters.get('assume_short_ts', False), + self._compute_parameters.get('return_courant', False), + self._network._waterbody_df, + self._waterbody_parameters, + self._network._waterbody_types_df, + self._network.waterbody_type_specified, + self._network.diffusive_network_data, + self._network.topobathy_df, + self._network.refactored_diffusive_domain, + self._network.refactored_reaches, + [], #subnetwork_list, + self._network.coastal_boundary_depth_df, + self._network.unrefactored_topobathy_df,) + + # update initial conditions with results output + self._network.new_nhd_q0(self._run_results) + self._network.update_waterbody_water_elevation() + + # update t0 + self._network.new_t0(self._time_step,self._nts) + + # get reservoir DA initial parameters for next loop iteration + self._data_assimilation.update(self._run_results, + self._data_assimilation_parameters, + self._run_parameters, + self._network, + [], #self._da_sets[run_set_iterator + 1] + ) + + (self._values['channel_exit_water_x-section__volume_flow_rate'], + self._values['channel_water_flow__speed'], + self._values['channel_water__mean_depth'], + self._values['lake_water~incoming__volume_flow_rate'], + self._values['lake_water~outgoing__volume_flow_rate'], + self._values['lake_surface__elevation'], + ) = tr.create_output_dataframes( + self._run_results, + self._nts, + self._network._waterbody_df, + self._network.link_lake_crosswalk) + + self._time += self._time_step * self._nts + + def update_frac(self, time_frac): + """Update model by a fraction of a time step. + Parameters + ---------- + time_frac : float + Fraction fo a time step. + """ + time_step = self.get_time_step() + self._model.time_step = time_frac * time_step + self.update() + self._model.time_step = time_step + + def update_until(self, then): + """Update model until a particular time. + Parameters + ---------- + then : float + Time to run model until in seconds. + """ + n_steps = (then - self.get_current_time()) / self.get_time_step() + + full_nts = self._nts + self._nts = n_steps + + self.update() + + self._nts = full_nts - n_steps + + ''' + for _ in range(int(n_steps)): + self.update() + self.update_frac(n_steps - int(n_steps)) + ''' + + def finalize(self): + """Finalize model.""" + + self._values = None + self._var_loc = None + self._var_grid_id = None + self._var_name_map_long_first = None + self._var_name_map_short_first = None + self._var_units_map = None + self._cfg_bmi = None + self._network = None + self._log_parameters = None + self._preprocessing_parameters = None + self._supernetwork_parameters = None + self._waterbody_parameters = None + self._compute_parameters = None + self._forcing_parameters = None + self._restart_parameters = None + self._hybrid_parameters = None + self._output_parameters = None + self._parity_parameters = None + self._data_assimilation_parameters = None + self._run_parameters = None + self._nts = None + self._values = None + self._start_time = None + self._end_time = None + self._time = None + self._time_step = None + self._time_units = None + self._data_assimilation = None + self._run_results = None + self._subnetwork_list = None + + def get_var_type(self, var_name): + """Data type of variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + str + Data type. + """ + return str(self.get_value_ptr(var_name).dtype) + + def get_var_units(self, var_name): + """Get units of variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + str + Variable units. + """ + return self._var_units[var_name] + + def get_var_nbytes(self, var_name): + """Get units of variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + int + Size of data array in bytes. + """ + return self.get_value_ptr(var_name).nbytes + + def get_var_itemsize(self, name): + return np.dtype(self.get_var_type(name)).itemsize + + def get_var_location(self, name): + return self._var_loc[name] + + def get_var_grid(self, var_name): + """Grid id for a variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + int + Grid id. + """ + for grid_id, var_name_list in self._grids.items(): + if var_name in var_name_list: + return grid_id + + def get_grid_rank(self, grid_id): + """Rank of grid. + Parameters + ---------- + grid_id : int + Identifier of a grid. + Returns + ------- + int + Rank of grid. + """ + return len(self._model.shape) + + def get_grid_size(self, grid_id): + """Size of grid. + Parameters + ---------- + grid_id : int + Identifier of a grid. + Returns + ------- + int + Size of grid. + """ + return int(np.prod(self._model.shape)) + + def get_value_ptr(self, var_name): + """Reference to values. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + array_like + Value array. + """ + return self._values[var_name] + + def get_value(self, var_name): + """Copy of values. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + output_df : pd.DataFrame + Copy of values. + """ + output_df = self.get_value_ptr(var_name) + return output_df + + def get_value_at_indices(self, var_name, dest, indices): + """Get values at particular indices. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + dest : ndarray + A numpy array into which to place the values. + indices : array_like + Array of indices. + Returns + ------- + array_like + Values at indices. + """ + dest[:] = self.get_value_ptr(var_name).take(indices) + return dest + + def set_value(self, var_name, src): + """ + Set model values + + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + src : array_like + Array of new values. + """ + val = self.get_value_ptr(var_name) + val[:] = src.reshape(val.shape) + + #self._values[var_name] = src + + def set_value_at_indices(self, name, inds, src): + """Set model values at particular indices. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + src : array_like + Array of new values. + indices : array_like + Array of indices. + """ + val = self.get_value_ptr(name) + val.flat[inds] = src + + def get_component_name(self): + """Name of the component.""" + return self._name + + def get_input_item_count(self): + """Get names of input variables.""" + return len(self._input_var_names) + + def get_output_item_count(self): + """Get names of output variables.""" + return len(self._output_var_names) + + def get_input_var_names(self): + """Get names of input variables.""" + return self._input_var_names + + def get_output_var_names(self): + """Get names of output variables.""" + return self._output_var_names + + def get_grid_shape(self, grid_id, shape): + """Number of rows and columns of uniform rectilinear grid.""" + var_name = self._grids[grid_id][0] + shape[:] = self.get_value_ptr(var_name).shape + return shape + + def get_grid_spacing(self, grid_id, spacing): + """Spacing of rows and columns of uniform rectilinear grid.""" + spacing[:] = self._model.spacing + return spacing + + def get_grid_origin(self, grid_id, origin): + """Origin of uniform rectilinear grid.""" + origin[:] = self._model.origin + return origin + + def get_grid_type(self, grid_id): + """Type of grid.""" + return self._grid_type[grid_id] + + def get_start_time(self): + """Start time of model.""" + return self._start_time + + def get_end_time(self): + """End time of model.""" + return self._end_time + + def get_current_time(self): + return self._time + + def get_time_step(self): + return self._time_step + + def get_time_units(self): + return self._time_units + + def get_grid_edge_count(self, grid): + raise NotImplementedError("get_grid_edge_count") + + def get_grid_edge_nodes(self, grid, edge_nodes): + raise NotImplementedError("get_grid_edge_nodes") + + def get_grid_face_count(self, grid): + raise NotImplementedError("get_grid_face_count") + + def get_grid_face_nodes(self, grid, face_nodes): + raise NotImplementedError("get_grid_face_nodes") + + def get_grid_node_count(self, grid): + """Number of grid nodes. + Parameters + ---------- + grid : int + Identifier of a grid. + Returns + ------- + int + Size of grid. + """ + return self.get_grid_size(grid) + + def get_grid_nodes_per_face(self, grid, nodes_per_face): + raise NotImplementedError("get_grid_nodes_per_face") + + def get_grid_face_edges(self, grid, face_edges): + raise NotImplementedError("get_grid_face_edges") + + def get_grid_x(self, grid, x): + raise NotImplementedError("get_grid_x") + + def get_grid_y(self, grid, y): + raise NotImplementedError("get_grid_y") + + def get_grid_z(self, grid, z): + raise NotImplementedError("get_grid_z") + + def _parse_config(self, cfg): + cfg_list = [cfg.get('flag'),cfg.get('file')] + return cfg_list \ No newline at end of file diff --git a/src/troute-network/troute/hyfeature_network_utilities.py b/src/troute-network/troute/hyfeature_network_utilities.py index e1fc65860..15e89712a 100644 --- a/src/troute-network/troute/hyfeature_network_utilities.py +++ b/src/troute-network/troute/hyfeature_network_utilities.py @@ -11,6 +11,7 @@ from joblib import delayed, Parallel import pyarrow as pa import pyarrow.parquet as pq +import xarray as xr import troute.nhd_io as nhd_io @@ -23,11 +24,12 @@ def build_forcing_sets( t0 ): - run_sets = forcing_parameters.get("qlat_forcing_sets", None) - nexus_input_folder = forcing_parameters.get("nexus_input_folder", None) - nts = forcing_parameters.get("nts", None) - max_loop_size = forcing_parameters.get("max_loop_size", 12) - dt = forcing_parameters.get("dt", None) + run_sets = forcing_parameters.get("qlat_forcing_sets", None) + nexus_input_folder = forcing_parameters.get("nexus_input_folder", None) + downstream_boundary_input_folder = forcing_parameters.get("downstream_boundary_input_folder", None) + nts = forcing_parameters.get("nts", None) + max_loop_size = forcing_parameters.get("max_loop_size", 12) + dt = forcing_parameters.get("dt", None) try: nexus_input_folder = pathlib.Path(nexus_input_folder) @@ -38,6 +40,7 @@ def build_forcing_sets( raise AssertionError("Aborting simulation because the nexus_input_folder:", qlat_input_folder,"does not exist. Please check the the nexus_input_folder variable is correctly entered in the .yaml control file") from None forcing_glob_filter = forcing_parameters.get("nexus_file_pattern_filter", "*.NEXOUT") + downstream_boundary_glob_filter = forcing_parameters.get("downstream_boundary_file_pattern_filter", "*SCHISM.nc") if forcing_glob_filter=="nex-*": print("Reformating qlat nexus files as hourly binary files...") @@ -58,7 +61,7 @@ def build_forcing_sets( nexus_input_folder, forcing_glob_filter = nex_files_to_binary(nexus_files_list, binary_folder) forcing_parameters["nexus_input_folder"] = nexus_input_folder forcing_parameters["nexus_file_pattern_filter"] = forcing_glob_filter - + # TODO: Throw errors if insufficient input data are available if run_sets: #FIXME: Change it for hyfeature @@ -136,7 +139,7 @@ def build_forcing_sets( if k + max_loop_size < len(forcing_filename_list): run_sets[j]['nexus_files'] = forcing_filename_list[k:k - + max_loop_size] + + max_loop_size] else: run_sets[j]['nexus_files'] = forcing_filename_list[k:] @@ -161,6 +164,81 @@ def build_forcing_sets( k += max_loop_size j += 1 + # creates downstream boundary forcing file list + if downstream_boundary_input_folder: + # get the first and seconded files from an ordered list of all forcing files + downstream_boundary_input_folder = pathlib.Path(downstream_boundary_input_folder) + all_files = sorted(downstream_boundary_input_folder.glob(downstream_boundary_glob_filter)) + first_file = all_files[0] + second_file = all_files[1] + + # Deduce the timeinterval of the forcing data from the output timestamps of the first + df = read_file(first_file) + t1_str = pd.to_datetime(df.time.iloc[0]).strftime("%Y-%m-%d_%H:%M:%S") + t1 = datetime.strptime(t1_str,"%Y-%m-%d_%H:%M:%S") + df = read_file(second_file) + t2_str = pd.to_datetime(df.time.iloc[0]).strftime("%Y-%m-%d_%H:%M:%S") + t2 = datetime.strptime(t2_str,"%Y-%m-%d_%H:%M:%S") + dt_downstream_boundary_timedelta = t2 - t1 + dt_downstream_boundary = dt_downstream_boundary_timedelta.seconds + + bts_subdivisions = dt_downstream_boundary / dt + #forcing_parameters['qts_subdivisions']= qts_subdivisions + + # the number of files required for the simulation + nfiles = int(np.ceil(nts / bts_subdivisions)) + + # list of downstream boundary file datetimes + datetime_list = [t0 + dt_downstream_boundary_timedelta * (n) for n in + range(nfiles)] + datetime_list_str = [datetime.strftime(d, '%Y%m%d%H%M') for d in + datetime_list] + # list of downstream boundary files + downstream_boundary_filename_list = [d_str + downstream_boundary_glob_filter[1:] for d_str in + datetime_list_str] + + # check that all forcing files exist + for f in downstream_boundary_filename_list: + try: + J = pathlib.Path(downstream_boundary_input_folder.joinpath(f)) + assert J.is_file() == True + except AssertionError: + raise AssertionError("Aborting simulation because downstream boundary file", J, "cannot be not found.") from None + + # build run sets list + #run_sets = [] + k = 0 + j = 0 + nts_accum = 0 + nts_last = 0 + while k < len(downstream_boundary_filename_list): + #run_sets.append({}) + + if k + max_loop_size < len(downstream_boundary_filename_list): + run_sets[j]['downstream_boundary_files'] = downstream_boundary_filename_list[k:k + + max_loop_size] + else: + run_sets[j]['downstream_boundary_files'] = downstream_boundary_filename_list[k:] + + #nts_accum += len(run_sets[j]['nexus_files']) * qts_subdivisions + #if nts_accum <= nts: + # run_sets[j]['nts'] = int(len(run_sets[j]['nexus_files']) + # * qts_subdivisions) + #else: + # run_sets[j]['nts'] = int(nts - nts_last) + + #final_nexout = nexus_input_folder.joinpath(run_sets[j]['nexus_files' + # ][-1]) + #df = read_file(final_nexout) + #final_timestamp_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S") + + #run_sets[j]['final_timestamp'] = \ + # datetime.strptime(final_timestamp_str, '%Y-%m-%d_%H:%M:%S') + + #nts_last = nts_accum + k += max_loop_size + j += 1 + return run_sets def build_qlateral_array( @@ -296,5 +374,7 @@ def read_file(file_name): elif extension=='.parquet': df = pq.read_table(file_name).to_pandas().reset_index() df.index.name = None - + elif extension=='.nc': + df = xr.open_dataset(file_name) + df = df.to_dataframe() return df \ No newline at end of file diff --git a/src/troute-network/troute/hyfeature_preprocess.py b/src/troute-network/troute/hyfeature_preprocess.py index 446f356df..8bfa47119 100644 --- a/src/troute-network/troute/hyfeature_preprocess.py +++ b/src/troute-network/troute/hyfeature_preprocess.py @@ -23,7 +23,7 @@ def build_hyfeature_network(supernetwork_parameters, waterbody_parameters, ): - + geo_file_path = supernetwork_parameters["geo_file_path"] cols = supernetwork_parameters["columns"] terminal_code = supernetwork_parameters.get("terminal_code", 0) @@ -692,17 +692,17 @@ def hyfeature_forcing( ----- """ - + # Unpack user-specified forcing parameters - dt = forcing_parameters.get("dt", None) - qts_subdivisions = forcing_parameters.get("qts_subdivisions", None) - nexus_input_folder = forcing_parameters.get("nexus_input_folder", None) - qlat_file_index_col = forcing_parameters.get("qlat_file_index_col", "feature_id") - qlat_file_value_col = forcing_parameters.get("qlat_file_value_col", "q_lateral") - qlat_file_gw_bucket_flux_col = forcing_parameters.get("qlat_file_gw_bucket_flux_col", "qBucket") - qlat_file_terrain_runoff_col = forcing_parameters.get("qlat_file_terrain_runoff_col", "qSfcLatRunoff") - - + dt = forcing_parameters.get("dt", None) + qts_subdivisions = forcing_parameters.get("qts_subdivisions", None) + nexus_input_folder = forcing_parameters.get("nexus_input_folder", None) + qlat_file_index_col = forcing_parameters.get("qlat_file_index_col", "feature_id") + qlat_file_value_col = forcing_parameters.get("qlat_file_value_col", "q_lateral") + qlat_file_gw_bucket_flux_col = forcing_parameters.get("qlat_file_gw_bucket_flux_col", "qBucket") + qlat_file_terrain_runoff_col = forcing_parameters.get("qlat_file_terrain_runoff_col", "qSfcLatRunoff") + downstream_boundary_input_folder = forcing_parameters.get("downstream_boundary_input_folder", None) + # TODO: find a better way to deal with these defaults and overrides. run["t0"] = run.get("t0", t0) run["nts"] = run.get("nts") @@ -712,7 +712,8 @@ def hyfeature_forcing( run["qlat_file_index_col"] = run.get("qlat_file_index_col", qlat_file_index_col) run["qlat_file_value_col"] = run.get("qlat_file_value_col", qlat_file_value_col) run["qlat_file_gw_bucket_flux_col"] = run.get("qlat_file_gw_bucket_flux_col", qlat_file_gw_bucket_flux_col) - run["qlat_file_terrain_runoff_col"] = run.get("qlat_file_terrain_runoff_col", qlat_file_terrain_runoff_col) + run["qlat_file_terrain_runoff_col"] = run.get("qlat_file_terrain_runoff_col", qlat_file_terrain_runoff_col) + run["downstream_boundary_input_folder"] = run.get("downstream_boundary_input_folder", downstream_boundary_input_folder) #--------------------------------------------------------------------------- # Assemble lateral inflow data @@ -749,12 +750,19 @@ def hyfeature_forcing( start_time = time.time() LOG.info("creating coastal dataframe ...") - coastal_boundary_domain = nhd_io.read_coastal_boundary_domain(coastal_boundary_domain_files) + coastal_boundary_domain = nhd_io.read_coastal_boundary_domain(coastal_boundary_domain_files) + # create dataframe for hourly schism data + coastal_boundary_depth_df = nhd_io.build_coastal_dataframe( + run, + coastal_boundary_domain, + ) + # create dataframe for multi hourly schism data + ''' coastal_boundary_depth_df = nhd_io.build_coastal_ncdf_dataframe( coastal_boundary_elev_files, coastal_boundary_domain, ) - + ''' LOG.debug( "coastal boundary elevation observation DataFrame creation complete in %s seconds." \ % (time.time() - start_time) diff --git a/src/troute-network/troute/main_utilities.py b/src/troute-network/troute/main_utilities.py new file mode 100644 index 000000000..6f80c2059 --- /dev/null +++ b/src/troute-network/troute/main_utilities.py @@ -0,0 +1,611 @@ +import argparse +import time +import logging +import pandas as pd +import numpy as np +from datetime import timedelta +#from .input import _input_handler_v03 +import troute.nhd_network_utilities_v02 as nnu +import troute.hyfeature_network_utilities as hnu +import troute.nhd_io as nhd_io + +#from .output import nwm_output_generator +from troute.NHDNetwork import NHDNetwork +from troute.HYFeaturesNetwork import HYFeaturesNetwork +from troute.DataAssimilation import AllDA +from troute.routing.compute import compute_nhd_routing_v02, compute_diffusive_routing + +LOG = logging.getLogger('') + +def initialize_network(args): + + # unpack user inputs + ( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + ) = _input_handler_v03(args) + + run_parameters = { + 'dt': forcing_parameters.get('dt'), + 'nts': forcing_parameters.get('nts'), + 'cpu_pool': compute_parameters.get('cpu_pool') + } + + showtiming = log_parameters.get("showtiming", None) + if showtiming: + task_times = {} + task_times['forcing_time'] = 0 + task_times['route_time'] = 0 + task_times['output_time'] = 0 + main_start_time = time.time() + + cpu_pool = compute_parameters.get("cpu_pool", None) + + # Build routing network data objects. Network data objects specify river + # network connectivity, channel geometry, and waterbody parameters. Also + # perform initial warmstate preprocess. + if showtiming: + network_start_time = time.time() + + #if "ngen_nexus_file" in supernetwork_parameters: + if supernetwork_parameters["geo_file_type"] == 'HYFeaturesNetwork': + network = HYFeaturesNetwork(supernetwork_parameters, + waterbody_parameters, + restart_parameters, + forcing_parameters, + verbose=True, showtiming=False) + + network.create_routing_network(network.connections, + network.dataframe, + network.waterbody_connections, + network.gages, + preprocessing_parameters, + compute_parameters, + waterbody_parameters,) + elif supernetwork_parameters["geo_file_type"] == 'NHDNetwork': + network = NHDNetwork(supernetwork_parameters=supernetwork_parameters, + waterbody_parameters=waterbody_parameters, + restart_parameters=restart_parameters, + forcing_parameters=forcing_parameters, + compute_parameters=compute_parameters, + data_assimilation_parameters=data_assimilation_parameters, + preprocessing_parameters=preprocessing_parameters, + verbose=True, + showtiming=False #showtiming, + ) + + if showtiming: + network_end_time = time.time() + task_times['network_time'] = network_end_time - network_start_time + + all_parameters = [log_parameters, #TODO: Delete this... + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + run_parameters + ] + + return (network, + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + run_parameters) + +def build_run_sets(network, supernetwork_parameters, forcing_parameters, compute_parameters, + data_assimilation_parameters, output_parameters, parity_parameters): + + # Create run_sets: sets of forcing files for each loop + if supernetwork_parameters["geo_file_type"] == 'NHDNetwork': + run_sets = nnu.build_forcing_sets(forcing_parameters, network.t0) + elif supernetwork_parameters["geo_file_type"] == 'HYFeaturesNetwork': + run_sets = hnu.build_forcing_sets(forcing_parameters, network.t0) + + # Create da_sets: sets of TimeSlice files for each loop + if "data_assimilation_parameters" in compute_parameters: + da_sets = nnu.build_da_sets(data_assimilation_parameters, run_sets, network.t0) + + # Create parity_sets: sets of CHRTOUT files against which to compare t-route flows + if "wrf_hydro_parity_check" in output_parameters: + parity_sets = nnu.build_parity_sets(parity_parameters, run_sets) + else: + parity_sets = [] + + return run_sets, da_sets, parity_sets + +def build_forcings(network, run, forcing_parameters, hybrid_parameters, compute_parameters): + + cpu_pool = compute_parameters.get('cpu_pool', None) + # Create forcing data within network object for first loop iteration + network.assemble_forcings(run, forcing_parameters, hybrid_parameters, cpu_pool) + + return network + +def build_data_assimilation(network, data_assimilation_parameters, waterbody_parameters, da_run, forcing_parameters, compute_parameters): + + #FIXME: hack to get run_parameters. This is done in input_handler_v2. Probably need + # to find a better way to do this here though... + if not 'run_parameters' in locals(): + run_parameters = {'dt': forcing_parameters.get('dt'), + 'nts': forcing_parameters.get('nts'), + 'cpu_pool': compute_parameters.get('cpu_pool', None)} + + # Create data assimilation object from da_sets for first loop iteration + data_assimilation = AllDA(data_assimilation_parameters, + run_parameters, + waterbody_parameters, + network, + da_run) + +# if showtiming: +# forcing_end_time = time.time() +# task_times['forcing_time'] += forcing_end_time - network_end_time + + return data_assimilation + +def run_routing(network, data_assimilation, run_sets, da_sets, compute_parameters, forcing_parameters, waterbody_parameters, + output_parameters, hybrid_parameters, data_assimilation_parameters, run_parameters, parity_sets): + ''' + + ''' + parallel_compute_method = compute_parameters.get("parallel_compute_method", None) + subnetwork_target_size = compute_parameters.get("subnetwork_target_size", 1) + qts_subdivisions = forcing_parameters.get("qts_subdivisions", 1) + compute_kernel = compute_parameters.get("compute_kernel", "V02-caching") + assume_short_ts = compute_parameters.get("assume_short_ts", False) + return_courant = compute_parameters.get("return_courant", False) + cpu_pool = compute_parameters.get("cpu_pool", 1) + + # Pass empty subnetwork list to nwm_route. These objects will be calculated/populated + # on first iteration of for loop only. For additional loops this will be passed + # to function from inital loop. + subnetwork_list = [None, None, None] + + for run_set_iterator, run in enumerate(run_sets): + + t0 = run.get("t0") + dt = run.get("dt") + nts = run.get("nts") + + if parity_sets: + parity_sets[run_set_iterator]["dt"] = dt + parity_sets[run_set_iterator]["nts"] = nts + +# if showtiming: +# route_start_time = time.time() + + run_results = nwm_route( + network.connections, + network.reverse_network, + network.waterbody_connections, + network._reaches_by_tw, ## check: def name is different from return self._ .. + parallel_compute_method, + compute_kernel, + subnetwork_target_size, + cpu_pool, + network.t0, ## check if t0 is being updated + dt, + nts, + qts_subdivisions, + network.independent_networks, + network.dataframe, + network.q0, + network._qlateral, + data_assimilation.usgs_df, + data_assimilation.lastobs_df, + data_assimilation.reservoir_usgs_df, + data_assimilation.reservoir_usgs_param_df, + data_assimilation.reservoir_usace_df, + data_assimilation.reservoir_usace_param_df, + data_assimilation.assimilation_parameters, + assume_short_ts, + return_courant, + network._waterbody_df, ## check: network._waterbody_df ?? def name is different from return self._ .. + waterbody_parameters, + network._waterbody_types_df, ## check: network._waterbody_types_df ?? def name is different from return self._ .. + network.waterbody_type_specified, + network.diffusive_network_data, + network.topobathy_df, + network.refactored_diffusive_domain, + network.refactored_reaches, + subnetwork_list, + network.coastal_boundary_depth_df, + network.unrefactored_topobathy_df, + ) + + # returns list, first item is run result, second item is subnetwork items + subnetwork_list = run_results[1] + run_results = run_results[0] + +# if showtiming: +# route_end_time = time.time() +# task_times['route_time'] += route_end_time - route_start_time + + # create initial conditions for next loop itteration + network.new_nhd_q0(run_results) + network.update_waterbody_water_elevation() + + if run_set_iterator < len(run_sets) - 1: + # update t0 + network.new_t0(dt,nts) + + # update forcing data + network.assemble_forcings(run_sets[run_set_iterator + 1], + forcing_parameters, + hybrid_parameters, + cpu_pool) + + # get reservoir DA initial parameters for next loop iteration + data_assimilation.update(run_results, + data_assimilation_parameters, + run_parameters, + network, + da_sets[run_set_iterator + 1]) + +# if showtiming: +# forcing_end_time = time.time() +# task_times['forcing_time'] += forcing_end_time - route_end_time + + # TODO move the conditional call to write_lite_restart to nwm_output_generator. +# if showtiming: +# output_start_time = time.time() + + if "lite_restart" in output_parameters: + nhd_io.write_lite_restart( + network.q0, + network._waterbody_df, + t0 + timedelta(seconds = dt * nts), + output_parameters['lite_restart'] + ) + ''' + nwm_output_generator( + run, + run_results, + supernetwork_parameters, + output_parameters, + parity_parameters, + restart_parameters, + parity_sets[run_set_iterator] if parity_parameters else {}, + qts_subdivisions, + compute_parameters.get("return_courant", False), + cpu_pool, + network._waterbody_df, + network._waterbody_types_df, + data_assimilation_parameters, + data_assimilation.lastobs_df, + network.link_gage_df, + network.link_lake_crosswalk, + ) + ''' +# if showtiming: +# output_end_time = time.time() +# task_times['output_time'] += output_end_time - output_start_time + return run_results + + +def _handle_args_v03(argv): + ''' + Handle command line input argument - filepath of configuration file + ''' + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + "-f", + "--custom-input-file", + dest="custom_input_file", + help="Path of a .yaml or .json file containing model configuration parameters. See doc/v3_doc.yaml", + ) + + return parser.parse_args(argv) + +def _input_handler_v03(args): + + ''' + Read user inputs from configuration file and set logging level + + Arguments + --------- + Args (argparse.Namespace): Command line input arguments + + Returns + ------- + log_parameters (dict): Input parameters re logging + preprocessing_parameters (dict): Input parameters re preprocessing + supernetwork_parameters (dict): Input parameters re network extent + waterbody_parameters (dict): Input parameters re waterbodies + compute_parameters (dict): Input parameters re computation settings + forcing_parameters (dict): Input parameters re model forcings + restart_parameters (dict): Input parameters re model restart + hybrid_parameters (dict): Input parameters re MC/diffusive wave model + output_parameters (dict): Input parameters re output writing + parity_parameters (dict): Input parameters re parity assessment + data_assimilation_parameters (dict): Input parameters re data assimilation + ''' + # get name of user configuration file (e.g. test.yaml) + custom_input_file = args.custom_input_file + + # read data from user configuration file + ( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + ) = nhd_io.read_config_file(custom_input_file) + + ''' + # configure python logger + log_level_set(log_parameters) + LOG = logging.getLogger('') + # if log level is at or below DEBUG, then check user inputs + if LOG.level <= 10: # DEBUG + check_inputs( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters + ) + ''' + return ( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + ) + +def nwm_route( + downstream_connections, + upstream_connections, + waterbodies_in_connections, + reaches_bytw, + parallel_compute_method, + compute_kernel, + subnetwork_target_size, + cpu_pool, + t0, + dt, + nts, + qts_subdivisions, + independent_networks, + param_df, + q0, + qlats, + usgs_df, + lastobs_df, + reservoir_usgs_df, + reservoir_usgs_param_df, + reservoir_usace_df, + reservoir_usace_param_df, + da_parameter_dict, + assume_short_ts, + return_courant, + waterbodies_df, + waterbody_parameters, + waterbody_types_df, + waterbody_type_specified, + diffusive_network_data, + topobathy, + refactored_diffusive_domain, + refactored_reaches, + subnetwork_list, + coastal_boundary_depth_df, + nonrefactored_topobathy, +): + + ################### Main Execution Loop across ordered networks + + + start_time = time.time() + + if return_courant: + LOG.info( + f"executing routing computation, with Courant evaluation metrics returned" + ) + else: + LOG.info(f"executing routing computation ...") + + start_time_mc = time.time() + results = compute_nhd_routing_v02( + downstream_connections, + upstream_connections, + waterbodies_in_connections, + reaches_bytw, + compute_kernel, + parallel_compute_method, + subnetwork_target_size, # The default here might be the whole network or some percentage... + cpu_pool, + t0, + dt, + nts, + qts_subdivisions, + independent_networks, + param_df, + q0, + qlats, + usgs_df, + lastobs_df, + reservoir_usgs_df, + reservoir_usgs_param_df, + reservoir_usace_df, + reservoir_usace_param_df, + da_parameter_dict, + assume_short_ts, + return_courant, + waterbodies_df, + waterbody_parameters, + waterbody_types_df, + waterbody_type_specified, + subnetwork_list, + ) + LOG.debug("MC computation complete in %s seconds." % (time.time() - start_time_mc)) + # returns list, first item is run result, second item is subnetwork items + subnetwork_list = results[1] + results = results[0] + + # run diffusive side of a hybrid simulation + if diffusive_network_data: + start_time_diff = time.time() + ''' + # retrieve MC-computed streamflow value at upstream boundary of diffusive mainstem + qvd_columns = pd.MultiIndex.from_product( + [range(nts), ["q", "v", "d"]] + ).to_flat_index() + flowveldepth = pd.concat( + [pd.DataFrame(r[1], index=r[0], columns=qvd_columns) for r in results], + copy=False, + ) + ''' + #upstream_boundary_flow={} + #for tw,v in diffusive_network_data.items(): + # upstream_boundary_link = diffusive_network_data[tw]['upstream_boundary_link'] + # flow_ = flowveldepth.loc[upstream_boundary_link][0::3] + # the very first value at time (0,q) is flow value at the first time step after initial time. + # upstream_boundary_flow[tw] = flow_ + + + # call diffusive wave simulation and append results to MC results + results.extend( + compute_diffusive_routing( + results, + diffusive_network_data, + cpu_pool, + t0, + dt, + nts, + q0, + qlats, + qts_subdivisions, + usgs_df, + lastobs_df, + da_parameter_dict, + waterbodies_df, + topobathy, + refactored_diffusive_domain, + refactored_reaches, + coastal_boundary_depth_df, + nonrefactored_topobathy, + ) + ) + LOG.debug("Diffusive computation complete in %s seconds." % (time.time() - start_time_diff)) + + LOG.debug("ordered reach computation complete in %s seconds." % (time.time() - start_time)) + + return results, subnetwork_list + +def create_output_dataframes(results, nts, waterbodies_df, link_lake_crosswalk): + + qvd_columns = pd.MultiIndex.from_product( + [range(int(nts)), ["q", "v", "d"]] + ).to_flat_index() + + flowveldepth = pd.concat( + [pd.DataFrame(r[1], index=r[0], columns=qvd_columns) for r in results], copy=False, + ) + + # create waterbody dataframe for output to netcdf file + i_columns = pd.MultiIndex.from_product( + [range(int(nts)), ["i"]] + ).to_flat_index() + + wbdy = pd.concat( + [pd.DataFrame(r[6], index=r[0], columns=i_columns) for r in results], + copy=False, + ) + + wbdy_id_list = waterbodies_df.index.values.tolist() + + i_lakeout_df = wbdy.loc[wbdy_id_list].iloc[:,-1] + q_lakeout_df = flowveldepth.loc[wbdy_id_list].iloc[:,-3] + d_lakeout_df = flowveldepth.loc[wbdy_id_list].iloc[:,-1] + # lakeout = pd.concat([i_df, q_df, d_df], axis=1) + + # replace waterbody lake_ids with outlet link ids + flowveldepth = _reindex_lake_to_link_id(flowveldepth, link_lake_crosswalk) + + q_channel_df = flowveldepth.iloc[:,-3] + v_channel_df = flowveldepth.iloc[:,-2] + d_channel_df = flowveldepth.iloc[:,-1] + + segment_ids = flowveldepth.index.values.tolist() + + return q_channel_df, v_channel_df, d_channel_df, i_lakeout_df, q_lakeout_df, d_lakeout_df#, wbdy_id_list, + +def _reindex_lake_to_link_id(target_df, crosswalk): + ''' + Utility function for replacing lake ID index values + with link ID values in a dataframe. This is used to + reinedex results dataframes + + Arguments: + ---------- + - target_df (DataFrame): Data frame to be reinexed + - crosswalk (dict): Relates lake ids to outlet link ids + + Returns: + -------- + - target_df (DataFrame): Re-indexed with link ids replacing + lake ids + ''' + + # evaluate intersection of lake ids and target_df index values + # i.e. what are the index positions of lake ids that need replacing? + lakeids = np.fromiter(crosswalk.keys(), dtype = int) + idxs = target_df.index.to_numpy() + lake_index_intersect = np.intersect1d( + idxs, + lakeids, + return_indices = True + ) + + # replace lake ids with link IDs in the target_df index array + linkids = np.fromiter(crosswalk.values(), dtype = int) + idxs[lake_index_intersect[1]] = linkids[lake_index_intersect[2]] + + # (re) set the target_df index + target_df.set_index(idxs, inplace = True) + + return target_df \ No newline at end of file diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index 3515cd2d6..4177d618b 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1564,10 +1564,10 @@ def build_coastal_dataframe(coastal_boundary_elev): return coastal_df -def build_coastal_ncdf_dataframe(coastal_ncdf): - with xr.open_dataset(coastal_ncdf) as ds: - coastal_ncdf_df = ds[["elev", "depth"]] - return coastal_ncdf_df.to_dataframe() +#def build_coastal_ncdf_dataframe(coastal_ncdf): +# with xr.open_dataset(coastal_ncdf) as ds: +# coastal_ncdf_df = ds[["elev", "depth"]] +# return coastal_ncdf_df.to_dataframe() def build_coastal_ncdf_dataframe( coastal_files, @@ -1583,18 +1583,19 @@ def build_coastal_ncdf_dataframe( elev_NAVD88 = ds.variables['elev'][:, coastal_boundary_nodes].filled(fill_value = np.nan) depth_bathy = ds.variables['depth'][coastal_boundary_nodes].filled(fill_value = np.nan) timesteps = ds.variables['time'][:] + import pdb; pdb.set_trace() if len(timesteps) > 1: dt_schism = timesteps[1]-timesteps[0] else: raise RuntimeError("schism provided less than 2 time steps") - start_date = ds.variables['time'].units + start_date = ds.variables['time'].units start_date = dparser.parse(start_date,fuzzy=True) dt_timeslice = timedelta(minutes=dt_schism/60.0) - tfin = start_date + dt_timeslice*len(timesteps) + tfin = start_date + dt_timeslice*len(timesteps) timestamps = pd.date_range(start_date, tfin, freq=dt_timeslice) timestamps = timestamps.strftime('%Y-%m-%d %H:%M:%S') - + import pdb; pdb.set_trace() # create a dataframe of water depth at coastal domain nodes timeslice_schism_list=[] for t in range(0, len(timesteps)+1): @@ -1613,13 +1614,53 @@ def build_coastal_ncdf_dataframe( unstack(1, fill_value = np.nan)['depth']) timeslice_schism_list.append(timeslice_schism) - + import pdb; pdb.set_trace() coastal_boundary_depth_df = pd.concat(timeslice_schism_list, axis=1, ignore_index=False) # linearly extrapolate depth value at start date coastal_boundary_depth_df.iloc[:,0] = 2.0*coastal_boundary_depth_df.iloc[:,1] - coastal_boundary_depth_df.iloc[:,2] return coastal_boundary_depth_df + +def build_coastal_dataframe( + run, + coastal_boundary_domain, + ): + + downstream_boundary_input_folder = run.get("downstream_boundary_input_folder",None) + + if downstream_boundary_input_folder: + downstream_boundary_input_folder = pathlib.Path(downstream_boundary_input_folder) + if "downstream_boundary_files" in run: + downstream_boundary_files = run.get("downstream_boundary_files") + downstream_boundary_files = [downstream_boundary_input_folder.joinpath(f) for f in downstream_boundary_files] + + timeslice_schism_list=[] + for f in downstream_boundary_files: + ds = xr.open_dataset(f) + df = ds.to_dataframe() + tws = [] + timestamps = [] + depths= [] + for tw, boundary_node in coastal_boundary_domain.items(): + tws.append(tw) + df2 = df[df['schism_id']==boundary_node] + date = df2.time.iloc[0] + timestamps.append(pd.to_datetime(date).strftime('%Y-%m-%d %H:%M:%S')) + depths.append(df2.elev.iat[0] + df2.depth.iat[0]) + + timeslice_schism = (pd.DataFrame({ + 'stationId' : tws, + 'datetime' : timestamps, + 'depth' : depths + }). + set_index(['stationId', 'datetime']). + unstack(1, fill_value = np.nan)['depth']) + timeslice_schism_list.append(timeslice_schism) + + coastal_boundary_depth_df = pd.concat(timeslice_schism_list, axis=1, ignore_index=False) + + return coastal_boundary_depth_df def lastobs_df_output( lastobs_df, diff --git a/test/unit_test_hyfeature/channel_forcing/201512010000SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010000SCHISM.nc new file mode 100644 index 000000000..5cca7c92f Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010000SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010100SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010100SCHISM.nc new file mode 100644 index 000000000..5e5fb6ca0 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010100SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010200SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010200SCHISM.nc new file mode 100644 index 000000000..816f4e51a Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010200SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010300SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010300SCHISM.nc new file mode 100644 index 000000000..441fea007 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010300SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010400SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010400SCHISM.nc new file mode 100644 index 000000000..ccf0bfe45 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010400SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010500SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010500SCHISM.nc new file mode 100644 index 000000000..33a2d6703 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010500SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010600SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010600SCHISM.nc new file mode 100644 index 000000000..5b73273ce Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010600SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010700SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010700SCHISM.nc new file mode 100644 index 000000000..5d6fb8374 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010700SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010800SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010800SCHISM.nc new file mode 100644 index 000000000..d1596153a Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010800SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512010900SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512010900SCHISM.nc new file mode 100644 index 000000000..d41392db9 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512010900SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011000SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011000SCHISM.nc new file mode 100644 index 000000000..0ba3f7143 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011000SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011100SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011100SCHISM.nc new file mode 100644 index 000000000..ca57f9101 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011100SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011200SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011200SCHISM.nc new file mode 100644 index 000000000..743d8438f Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011200SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011300SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011300SCHISM.nc new file mode 100644 index 000000000..442cd12d0 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011300SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011400SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011400SCHISM.nc new file mode 100644 index 000000000..ead97240b Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011400SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011500SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011500SCHISM.nc new file mode 100644 index 000000000..7ba2386ad Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011500SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011600SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011600SCHISM.nc new file mode 100644 index 000000000..8812012b9 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011600SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011700SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011700SCHISM.nc new file mode 100644 index 000000000..af0a02c97 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011700SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011800SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011800SCHISM.nc new file mode 100644 index 000000000..5609d12d0 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011800SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512011900SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512011900SCHISM.nc new file mode 100644 index 000000000..33ccf3d67 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512011900SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512012000SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512012000SCHISM.nc new file mode 100644 index 000000000..9722cb096 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512012000SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512012100SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512012100SCHISM.nc new file mode 100644 index 000000000..6e338876a Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512012100SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512012200SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512012200SCHISM.nc new file mode 100644 index 000000000..5228d0733 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512012200SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512012300SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512012300SCHISM.nc new file mode 100644 index 000000000..cbdeb250d Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512012300SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020000SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020000SCHISM.nc new file mode 100644 index 000000000..e8a04c311 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020000SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020100SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020100SCHISM.nc new file mode 100644 index 000000000..de925aef6 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020100SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020200SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020200SCHISM.nc new file mode 100644 index 000000000..6406e56a4 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020200SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020300SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020300SCHISM.nc new file mode 100644 index 000000000..8cf50d3a1 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020300SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020400SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020400SCHISM.nc new file mode 100644 index 000000000..23611877c Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020400SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020500SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020500SCHISM.nc new file mode 100644 index 000000000..77ab0b436 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020500SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020600SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020600SCHISM.nc new file mode 100644 index 000000000..ae31e2ddd Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020600SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020700SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020700SCHISM.nc new file mode 100644 index 000000000..fa10739c1 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020700SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020800SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020800SCHISM.nc new file mode 100644 index 000000000..3cd168031 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020800SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512020900SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512020900SCHISM.nc new file mode 100644 index 000000000..24e5341da Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512020900SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021000SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021000SCHISM.nc new file mode 100644 index 000000000..203df94f3 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021000SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021100SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021100SCHISM.nc new file mode 100644 index 000000000..3b07f7aae Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021100SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021200SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021200SCHISM.nc new file mode 100644 index 000000000..1cd6c0927 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021200SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021300SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021300SCHISM.nc new file mode 100644 index 000000000..99b1078b6 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021300SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021400SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021400SCHISM.nc new file mode 100644 index 000000000..7fe40bf00 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021400SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021500SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021500SCHISM.nc new file mode 100644 index 000000000..19d00fc01 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021500SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021600SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021600SCHISM.nc new file mode 100644 index 000000000..21aa7bd21 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021600SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021700SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021700SCHISM.nc new file mode 100644 index 000000000..a1a9689db Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021700SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021800SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021800SCHISM.nc new file mode 100644 index 000000000..95ad6612a Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021800SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512021900SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512021900SCHISM.nc new file mode 100644 index 000000000..47e850bd6 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512021900SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512022000SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512022000SCHISM.nc new file mode 100644 index 000000000..89499a16f Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512022000SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512022100SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512022100SCHISM.nc new file mode 100644 index 000000000..829e37a65 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512022100SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512022200SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512022200SCHISM.nc new file mode 100644 index 000000000..e020547f5 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512022200SCHISM.nc differ diff --git a/test/unit_test_hyfeature/channel_forcing/201512022300SCHISM.nc b/test/unit_test_hyfeature/channel_forcing/201512022300SCHISM.nc new file mode 100644 index 000000000..44f29c9f2 Binary files /dev/null and b/test/unit_test_hyfeature/channel_forcing/201512022300SCHISM.nc differ diff --git a/test/unit_test_hyfeature/domain/coastal_boundary_domain.yaml b/test/unit_test_hyfeature/domain/coastal_boundary_domain.yaml index 3374cc333..95ef85195 100644 --- a/test/unit_test_hyfeature/domain/coastal_boundary_domain.yaml +++ b/test/unit_test_hyfeature/domain/coastal_boundary_domain.yaml @@ -1,4 +1,5 @@ -10237: 0 # Lower Colorado River +10237: 252935 # Lower Colorado River +10238: 2525678 # Lower Colorado River #5781901: 252935 # Lower Colorado River diff --git a/test/unit_test_hyfeature/run-troute-with-bmi.py b/test/unit_test_hyfeature/run-troute-with-bmi.py new file mode 100644 index 000000000..f31b0bafe --- /dev/null +++ b/test/unit_test_hyfeature/run-troute-with-bmi.py @@ -0,0 +1,29 @@ +import sys +sys.path.append("../../src/") +import bmi_troute # This is the BMI t-route that we will be running from the file: bmi_troute.py +import troute.main_utilities as mu # This is used to read q_lateral data from files, won't be needed when t-route bmi is run with model engine +import troute.nhd_preprocess as nhd_prep # This is used to read q_lateral data from files, won't be needed when t-route bmi is run with model engine +model = bmi_troute.bmi_troute() + +# this call covers lines 41 ~ 110 of def main_v04 +model.initialize(bmi_cfg_file='unittest_hyfeature.yaml') + +# this call overs lines 112 ~ 126 +(run_sets, da_sets, parity_sets) = mu.build_run_sets(model._network, + model._supernetwork_parameters, + model._forcing_parameters, + model._compute_parameters, + model._data_assimilation_parameters, + model._output_parameters, + model._parity_parameters) + +# retrieve forcing input data such as lateral flow and coastal boundary data +cpu_pool = model._compute_parameters.get("cpu_pool", None) +model._network.assemble_forcings(run_sets[0], model._forcing_parameters, model._hybrid_parameters, cpu_pool) +import pdb; pdb.set_trace() +# move lateral flow data into BMI compatible variable (model._values) +model.set_value('land_surface_water_source__volume_flow_rate', model._network._qlateral.to_numpy()) +model.set_value('coastal_boundary__depth', model._network._coastal_boundary_depth_df.to_numpy()) + +i=2 +#model.update() \ No newline at end of file diff --git a/test/unit_test_hyfeature/unittest_hyfeature.yaml b/test/unit_test_hyfeature/unittest_hyfeature.yaml index a04032d8c..2bc0ddb1f 100644 --- a/test/unit_test_hyfeature/unittest_hyfeature.yaml +++ b/test/unit_test_hyfeature/unittest_hyfeature.yaml @@ -57,16 +57,18 @@ compute_parameters: coastal_boundary_domain: domain/coastal_boundary_domain.yaml forcing_parameters: #---------- - qts_subdivisions : 12 - dt : 300 # [sec] - qlat_input_folder : channel_forcing/ - qlat_file_pattern_filter : "*.CHRTOUT_DOMAIN1" - nexus_input_folder : channel_forcing/ - nexus_file_pattern_filter : "*NEXOUT.csv" #OR "*NEXOUT.parquet" OR "nex-*" - binary_nexus_file_folder : binary_files # this is required if nexus_file_pattern_filter="nex-*" - coastal_boundary_input_file : channel_forcing/schout_1.nc - nts : 48 #288 for 1day - max_loop_size : 2 # [hr] + qts_subdivisions : 12 + dt : 300 # [sec] + qlat_input_folder : channel_forcing/ + qlat_file_pattern_filter : "*.CHRTOUT_DOMAIN1" + nexus_input_folder : channel_forcing/ + nexus_file_pattern_filter : "*NEXOUT.csv" #OR "*NEXOUT.parquet" OR "nex-*" + binary_nexus_file_folder : binary_files # this is required if nexus_file_pattern_filter="nex-*" + coastal_boundary_input_file : channel_forcing/schout_1.nc + downstream_boundary_input_folder : channel_forcing/ + downstream_boundary_file_pattern_filter : "*SCHISM.nc" + nts : 288 #288 for 1day + max_loop_size : 2 # [hr] data_assimilation_parameters: #---------- usgs_timeslices_folder : #usgs_TimeSlice/