Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve debug mode handling #282

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/supy/_post.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
import pandas as pd
import copy
from .supy_driver import suews_driver as sd


Expand Down Expand Up @@ -276,6 +277,7 @@ def pack_dict_debug(dts_debug):
Returns:
dict: A dictionary containing the packed debug information.
"""

list_props = [
attr
for attr in dir(dts_debug)
Expand All @@ -289,7 +291,7 @@ def pack_dict_debug(dts_debug):
getattr(dts_debug, prop)
if is_numeric(getattr(dts_debug, prop))
# if some properties are fortran derived types then iterate use this function
else pack_dict_debug(getattr(dts_debug, prop))
else pack_dict_debug(copy.deepcopy(getattr(dts_debug, prop)))
)
for prop in list_props
}
Expand Down Expand Up @@ -348,6 +350,7 @@ def pack_df_debug(dict_debug):
df_debug_raw = pack_df_debug_raw(dict_debug)
df_debug_raw.index = df_debug_raw.index.rename(
[
"datetime",
"grid",
"step",
"group",
Expand Down
88 changes: 43 additions & 45 deletions src/supy/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import numpy as np
import pandas as pd
from .supy_driver import suews_driver as sd
import copy

from ._load import (
df_var_info,
Expand Down Expand Up @@ -140,21 +141,13 @@ def suews_cal_tstep_multi(dict_state_start, df_forcing_block):
"len_sim": np.array(df_forcing_block.shape[0], dtype=int),
}
)
# print("list_var_input_multitsteps", sorted(list_var_input_multitsteps))
# print("dict_input.keys()", sorted(dict_input.keys()))
# set_dif=set(dict_input.keys()) - set(list_var_input_multitsteps)
# print("set_dif", sorted(set_dif))

dict_input = {k: dict_input[k] for k in list_var_input_multitsteps}
# dict_input = {k: dict_input[k] if k in dict_input else np.array([0.0]) for k in list_var_input_multitsteps}

# main calculation:

try:
res_suews_tstep_multi, res_mod_state = sd.suews_cal_multitsteps(**dict_input)
# TODO: #233 res_mod_state will be used in the future for debugging purpose
# convert res_mod_state to a dict
# Assuming dts_debug is your object instance
dict_debug = pack_dict_debug(res_mod_state)

except Exception as ex:
# show trace info
Expand Down Expand Up @@ -195,6 +188,12 @@ def suews_cal_tstep_multi(dict_state_start, df_forcing_block):
dict_output_array = dict(zip(list_var, list_arr))
df_output_block = pack_df_output_block(dict_output_array, df_forcing_block)

# TODO: #233 res_mod_state will be used in the future for debugging purpose
# convert res_mod_state to a dict
# Assuming dts_debug is your object instance
# deepcopy is used to avoid reference issue when passing the object
dict_debug = copy.deepcopy(pack_dict_debug(res_mod_state))

return dict_state_end, df_output_block, dict_debug


Expand Down Expand Up @@ -359,35 +358,32 @@ def run_supy_ser(
df_state_init_chunk = df_state_init.copy()
list_df_output = []
list_df_state = []
list_df_debug = []
for grp in grp_forcing_chunk.groups:
# get forcing of a specific year
df_forcing_chunk = grp_forcing_chunk.get_group(grp)
# run supy: actual execution done in the `else` clause below
df_output_chunk, df_state_final_chunk = run_supy_ser(
df_forcing_chunk, df_state_init_chunk, chunk_day=chunk_day
df_output_chunk, df_state_final_chunk, df_debug_chunk = run_supy_ser(
df_forcing_chunk,
df_state_init_chunk,
chunk_day=chunk_day,
)
df_state_init_chunk = df_state_final_chunk.copy()
# collect results
list_df_output.append(df_output_chunk)
list_df_state.append(df_state_final_chunk)
list_df_debug.append(df_debug_chunk)

# re-organise results of each year
df_output = pd.concat(list_df_output).sort_index()
df_state_final = pd.concat(list_df_state).sort_index().drop_duplicates()
return df_output, df_state_final
df_debug = pd.concat(list_df_debug).sort_index()

else:
# for single-chunk run (1 chunk = {chunk_day} years), directly put df_forcing into supy_driver for calculation
# for single-chunk run (1 chunk = {chunk_day} days), directly put df_forcing into supy_driver for calculation
# use higher level wrapper that calculate at a `block` level
# for better performance

# # construct input list for `Pool.starmap`
# construct input list for `dask.bag`
list_dict_state_input = [
# (dict_state[(tstep_init, grid)], df_forcing)
dict_state[(tstep_init, grid)]
for grid in list_grid
]
# for better performance by reducing runtime memory usage
list_dict_state_input = [dict_state[(tstep_init, grid)] for grid in list_grid]

try:
list_res_grid = [
Expand All @@ -397,7 +393,7 @@ def run_supy_ser(

list_dict_state_end, list_df_output, list_dict_debug = zip(*list_res_grid)

except:
except Exception:
path_zip_debug = save_zip_debug(df_forcing, df_state_init)
raise RuntimeError(
f"\n====================\n"
Expand All @@ -419,46 +415,43 @@ def run_supy_ser(
for grid, dict_state_end in zip(list_grid, list_dict_state_end)
}
dict_state.update(dict_state_final_tstep)

# collect debug info
dict_debug = {grid: debug for grid, debug in zip(list_grid, list_dict_debug)}
df_debug = pack_df_debug(dict_debug)
df_state_final = pack_df_state(dict_state).swaplevel(0, 1)
# pack final model states into a proper dataframe
df_state_final = pack_df_state_final(df_state_final, df_init)

# save results as time-aware DataFrame
df_output0 = pd.concat(dict_df_output, names=["grid"]).sort_index()
df_output = df_output0.replace(-999.0, np.nan)
df_state_final = pack_df_state(dict_state).swaplevel(0, 1)

# drop ESTM for now as it is not supported yet
df_output = df_output.drop("ESTM", axis=1, level="group")
# trim multi-index based columns
df_output.columns = df_output.columns.remove_unused_levels()
# drop ESTM for now as it is not supported yet
df_output = df_output.drop("ESTM", axis=1, level="group")
# trim multi-index based columns
df_output.columns = df_output.columns.remove_unused_levels()

# pack final model states into a proper dataframe
df_state_final = pack_df_state_final(df_state_final, df_init)
# collect debug info
dict_debug = {
(tstep_final, grid): debug
for grid, debug in zip(list_grid, list_dict_debug)
}
df_debug = pack_df_debug(dict_debug)

# return results
try:
if df_state_init["debug"].any().any():
return df_output, df_state_final, df_debug
else:
return df_output, df_state_final
except KeyError:
return df_output, df_state_final
return df_output, df_state_final, df_debug


def run_save_supy(
df_forcing_tstep, df_state_init_m, ind, save_state, n_yr, path_dir_temp
):
# run supy in serial mode
df_output, df_state_final = run_supy_ser(
df_output, df_state_final, df_debug = run_supy_ser(
df_forcing_tstep, df_state_init_m, save_state, n_yr
)
# save to path_dir_temp
path_out = path_dir_temp / f"{ind}_out.pkl"
path_state = path_dir_temp / f"{ind}_state.pkl"
path_debug = path_dir_temp / f"{ind}_debug.pkl"
df_output.to_pickle(path_out)
df_state_final.to_pickle(path_state)
df_debug.to_pickle(path_debug)


# parallel mode: only used on Linux/macOS; Windows is not supported yet.
Expand Down Expand Up @@ -499,9 +492,14 @@ def run_supy_par(df_forcing_tstep, df_state_init_m, save_state, n_yr):
for n in np.arange(n_grid)
]
)
# print(list(path_dir_temp.glob('*')))
df_debug = pd.concat(
[
pd.read_pickle(path_dir_temp / f"{n}_debug.pkl")
for n in np.arange(n_grid)
]
)

return df_output, df_state_final
return df_output, df_state_final, df_debug


# main calculation end here
Expand Down
17 changes: 9 additions & 8 deletions src/supy/_supy_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,6 @@ def run_supy(
if debug_mode:
logging_level = logging.DEBUG
logger_supy.setLevel(logging_level)
df_state_init.loc[:, ("debug", "0")] = 1
else:
df_state_init.loc[:, ("debug", "0")] = 0

# set up a timer for simulation time
start = time.time()
Expand Down Expand Up @@ -392,12 +389,16 @@ def run_supy(
end = time.time()
logger_supy.info(f"Execution time: {(end - start):.1f} s")
logger_supy.info(f"====================\n")
try:
df_output, df_state_final = res_supy
return df_output, df_state_final
except:
df_output, df_state_final, res_debug = res_supy

# unpack results
df_output, df_state_final, res_debug = res_supy

# return results based on debugging needs
if debug_mode:
return df_output, df_state_final, res_debug
else:
return df_output, df_state_final



##############################################################################
Expand Down
Loading