Skip to content

Commit

Permalink
Merge pull request #282 from UMEP-dev/fix-df_debug
Browse files Browse the repository at this point in the history
improve debug mode handling
  • Loading branch information
sunt05 authored Aug 6, 2024
2 parents 7787e6c + 81491b8 commit 55ff2d9
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 54 deletions.
5 changes: 4 additions & 1 deletion src/supy/_post.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
import pandas as pd
import copy
from .supy_driver import suews_driver as sd


Expand Down Expand Up @@ -276,6 +277,7 @@ def pack_dict_debug(dts_debug):
Returns:
dict: A dictionary containing the packed debug information.
"""

list_props = [
attr
for attr in dir(dts_debug)
Expand All @@ -289,7 +291,7 @@ def pack_dict_debug(dts_debug):
getattr(dts_debug, prop)
if is_numeric(getattr(dts_debug, prop))
# if some properties are fortran derived types then iterate use this function
else pack_dict_debug(getattr(dts_debug, prop))
else pack_dict_debug(copy.deepcopy(getattr(dts_debug, prop)))
)
for prop in list_props
}
Expand Down Expand Up @@ -348,6 +350,7 @@ def pack_df_debug(dict_debug):
df_debug_raw = pack_df_debug_raw(dict_debug)
df_debug_raw.index = df_debug_raw.index.rename(
[
"datetime",
"grid",
"step",
"group",
Expand Down
88 changes: 43 additions & 45 deletions src/supy/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import numpy as np
import pandas as pd
from .supy_driver import suews_driver as sd
import copy

from ._load import (
df_var_info,
Expand Down Expand Up @@ -140,21 +141,13 @@ def suews_cal_tstep_multi(dict_state_start, df_forcing_block):
"len_sim": np.array(df_forcing_block.shape[0], dtype=int),
}
)
# print("list_var_input_multitsteps", sorted(list_var_input_multitsteps))
# print("dict_input.keys()", sorted(dict_input.keys()))
# set_dif=set(dict_input.keys()) - set(list_var_input_multitsteps)
# print("set_dif", sorted(set_dif))

dict_input = {k: dict_input[k] for k in list_var_input_multitsteps}
# dict_input = {k: dict_input[k] if k in dict_input else np.array([0.0]) for k in list_var_input_multitsteps}

# main calculation:

try:
res_suews_tstep_multi, res_mod_state = sd.suews_cal_multitsteps(**dict_input)
# TODO: #233 res_mod_state will be used in the future for debugging purpose
# convert res_mod_state to a dict
# Assuming dts_debug is your object instance
dict_debug = pack_dict_debug(res_mod_state)

except Exception as ex:
# show trace info
Expand Down Expand Up @@ -195,6 +188,12 @@ def suews_cal_tstep_multi(dict_state_start, df_forcing_block):
dict_output_array = dict(zip(list_var, list_arr))
df_output_block = pack_df_output_block(dict_output_array, df_forcing_block)

# TODO: #233 res_mod_state will be used in the future for debugging purpose
# convert res_mod_state to a dict
# Assuming dts_debug is your object instance
# deepcopy is used to avoid reference issue when passing the object
dict_debug = copy.deepcopy(pack_dict_debug(res_mod_state))

return dict_state_end, df_output_block, dict_debug


Expand Down Expand Up @@ -359,35 +358,32 @@ def run_supy_ser(
df_state_init_chunk = df_state_init.copy()
list_df_output = []
list_df_state = []
list_df_debug = []
for grp in grp_forcing_chunk.groups:
# get forcing of a specific year
df_forcing_chunk = grp_forcing_chunk.get_group(grp)
# run supy: actual execution done in the `else` clause below
df_output_chunk, df_state_final_chunk = run_supy_ser(
df_forcing_chunk, df_state_init_chunk, chunk_day=chunk_day
df_output_chunk, df_state_final_chunk, df_debug_chunk = run_supy_ser(
df_forcing_chunk,
df_state_init_chunk,
chunk_day=chunk_day,
)
df_state_init_chunk = df_state_final_chunk.copy()
# collect results
list_df_output.append(df_output_chunk)
list_df_state.append(df_state_final_chunk)
list_df_debug.append(df_debug_chunk)

# re-organise results of each year
df_output = pd.concat(list_df_output).sort_index()
df_state_final = pd.concat(list_df_state).sort_index().drop_duplicates()
return df_output, df_state_final
df_debug = pd.concat(list_df_debug).sort_index()

else:
# for single-chunk run (1 chunk = {chunk_day} years), directly put df_forcing into supy_driver for calculation
# for single-chunk run (1 chunk = {chunk_day} days), directly put df_forcing into supy_driver for calculation
# use higher level wrapper that calculate at a `block` level
# for better performance

# # construct input list for `Pool.starmap`
# construct input list for `dask.bag`
list_dict_state_input = [
# (dict_state[(tstep_init, grid)], df_forcing)
dict_state[(tstep_init, grid)]
for grid in list_grid
]
# for better performance by reducing runtime memory usage
list_dict_state_input = [dict_state[(tstep_init, grid)] for grid in list_grid]

try:
list_res_grid = [
Expand All @@ -397,7 +393,7 @@ def run_supy_ser(

list_dict_state_end, list_df_output, list_dict_debug = zip(*list_res_grid)

except:
except Exception:
path_zip_debug = save_zip_debug(df_forcing, df_state_init)
raise RuntimeError(
f"\n====================\n"
Expand All @@ -419,46 +415,43 @@ def run_supy_ser(
for grid, dict_state_end in zip(list_grid, list_dict_state_end)
}
dict_state.update(dict_state_final_tstep)

# collect debug info
dict_debug = {grid: debug for grid, debug in zip(list_grid, list_dict_debug)}
df_debug = pack_df_debug(dict_debug)
df_state_final = pack_df_state(dict_state).swaplevel(0, 1)
# pack final model states into a proper dataframe
df_state_final = pack_df_state_final(df_state_final, df_init)

# save results as time-aware DataFrame
df_output0 = pd.concat(dict_df_output, names=["grid"]).sort_index()
df_output = df_output0.replace(-999.0, np.nan)
df_state_final = pack_df_state(dict_state).swaplevel(0, 1)

# drop ESTM for now as it is not supported yet
df_output = df_output.drop("ESTM", axis=1, level="group")
# trim multi-index based columns
df_output.columns = df_output.columns.remove_unused_levels()
# drop ESTM for now as it is not supported yet
df_output = df_output.drop("ESTM", axis=1, level="group")
# trim multi-index based columns
df_output.columns = df_output.columns.remove_unused_levels()

# pack final model states into a proper dataframe
df_state_final = pack_df_state_final(df_state_final, df_init)
# collect debug info
dict_debug = {
(tstep_final, grid): debug
for grid, debug in zip(list_grid, list_dict_debug)
}
df_debug = pack_df_debug(dict_debug)

# return results
try:
if df_state_init["debug"].any().any():
return df_output, df_state_final, df_debug
else:
return df_output, df_state_final
except KeyError:
return df_output, df_state_final
return df_output, df_state_final, df_debug


def run_save_supy(
df_forcing_tstep, df_state_init_m, ind, save_state, n_yr, path_dir_temp
):
# run supy in serial mode
df_output, df_state_final = run_supy_ser(
df_output, df_state_final, df_debug = run_supy_ser(
df_forcing_tstep, df_state_init_m, save_state, n_yr
)
# save to path_dir_temp
path_out = path_dir_temp / f"{ind}_out.pkl"
path_state = path_dir_temp / f"{ind}_state.pkl"
path_debug = path_dir_temp / f"{ind}_debug.pkl"
df_output.to_pickle(path_out)
df_state_final.to_pickle(path_state)
df_debug.to_pickle(path_debug)


# parallel mode: only used on Linux/macOS; Windows is not supported yet.
Expand Down Expand Up @@ -499,9 +492,14 @@ def run_supy_par(df_forcing_tstep, df_state_init_m, save_state, n_yr):
for n in np.arange(n_grid)
]
)
# print(list(path_dir_temp.glob('*')))
df_debug = pd.concat(
[
pd.read_pickle(path_dir_temp / f"{n}_debug.pkl")
for n in np.arange(n_grid)
]
)

return df_output, df_state_final
return df_output, df_state_final, df_debug


# main calculation end here
Expand Down
17 changes: 9 additions & 8 deletions src/supy/_supy_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,6 @@ def run_supy(
if debug_mode:
logging_level = logging.DEBUG
logger_supy.setLevel(logging_level)
df_state_init.loc[:, ("debug", "0")] = 1
else:
df_state_init.loc[:, ("debug", "0")] = 0

# set up a timer for simulation time
start = time.time()
Expand Down Expand Up @@ -392,12 +389,16 @@ def run_supy(
end = time.time()
logger_supy.info(f"Execution time: {(end - start):.1f} s")
logger_supy.info(f"====================\n")
try:
df_output, df_state_final = res_supy
return df_output, df_state_final
except:
df_output, df_state_final, res_debug = res_supy

# unpack results
df_output, df_state_final, res_debug = res_supy

# return results based on debugging needs
if debug_mode:
return df_output, df_state_final, res_debug
else:
return df_output, df_state_final



##############################################################################
Expand Down

0 comments on commit 55ff2d9

Please sign in to comment.