From 460203c5b54c597e4416bbef30d67fc2a9da3ee8 Mon Sep 17 00:00:00 2001 From: Jeffrey Newman Date: Sun, 12 Nov 2023 12:54:45 -0600 Subject: [PATCH] Fix memory usage (#751) * clear cache of flow.tree * categorical time period dtype * add pydantic for tests * use time_label_dtype only when available * allow missing taz skim_dict * recover tree when needed * predigitized time periods * pass sh_tree back again for tracing * better error message --- .../abm/models/parking_location_choice.py | 2 +- activitysim/abm/models/trip_mode_choice.py | 2 +- activitysim/abm/models/util/logsums.py | 16 +++--- activitysim/abm/models/util/mode.py | 8 ++- .../models/util/vectorize_tour_scheduling.py | 50 +++++++++++++------ activitysim/abm/tables/landuse.py | 12 +++-- activitysim/core/flow.py | 36 ++++++++++--- activitysim/core/interaction_simulate.py | 13 +++-- activitysim/core/los.py | 24 +++++++-- activitysim/core/simulate.py | 6 +-- activitysim/core/skim_dataset.py | 30 +++++++++-- conda-environments/github-actions-tests.yml | 1 + 12 files changed, 147 insertions(+), 53 deletions(-) diff --git a/activitysim/abm/models/parking_location_choice.py b/activitysim/abm/models/parking_location_choice.py index 870f01af9..dbec927be 100644 --- a/activitysim/abm/models/parking_location_choice.py +++ b/activitysim/abm/models/parking_location_choice.py @@ -318,7 +318,7 @@ def parking_location( if "trip_period" not in trips_merged_df: # TODO: resolve this to the skim time period index not the label, it will be faster trips_merged_df["trip_period"] = network_los.skim_time_period_label( - trips_merged_df[proposed_trip_departure_period] + trips_merged_df[proposed_trip_departure_period], as_cat=True ) model_settings["TRIP_DEPARTURE_PERIOD"] = "trip_period" diff --git a/activitysim/abm/models/trip_mode_choice.py b/activitysim/abm/models/trip_mode_choice.py index 8f3e9f418..907444137 100644 --- a/activitysim/abm/models/trip_mode_choice.py +++ b/activitysim/abm/models/trip_mode_choice.py @@ -73,7 +73,7 @@ def trip_mode_choice( # setup skim keys assert "trip_period" not in trips_merged trips_merged["trip_period"] = network_los.skim_time_period_label( - trips_merged.depart + trips_merged.depart, as_cat=True ) orig_col = "origin" diff --git a/activitysim/abm/models/util/logsums.py b/activitysim/abm/models/util/logsums.py index c48586a86..fff541e92 100644 --- a/activitysim/abm/models/util/logsums.py +++ b/activitysim/abm/models/util/logsums.py @@ -75,10 +75,10 @@ def compute_logsums( # FIXME - are we ok with altering choosers (so caller doesn't have to set these)? if (in_period_col is not None) and (out_period_col is not None): choosers["in_period"] = network_los.skim_time_period_label( - choosers[in_period_col] + choosers[in_period_col], as_cat=True ) choosers["out_period"] = network_los.skim_time_period_label( - choosers[out_period_col] + choosers[out_period_col], as_cat=True ) elif ("in_period" not in choosers.columns) and ( "out_period" not in choosers.columns @@ -92,17 +92,21 @@ def compute_logsums( and tour_purpose in model_settings["OUT_PERIOD"] ): choosers["in_period"] = network_los.skim_time_period_label( - model_settings["IN_PERIOD"][tour_purpose] + model_settings["IN_PERIOD"][tour_purpose], + as_cat=True, + broadcast_to=choosers.index, ) choosers["out_period"] = network_los.skim_time_period_label( - model_settings["OUT_PERIOD"][tour_purpose] + model_settings["OUT_PERIOD"][tour_purpose], + as_cat=True, + broadcast_to=choosers.index, ) else: choosers["in_period"] = network_los.skim_time_period_label( - model_settings["IN_PERIOD"] + model_settings["IN_PERIOD"], as_cat=True, broadcast_to=choosers.index ) choosers["out_period"] = network_los.skim_time_period_label( - model_settings["OUT_PERIOD"] + model_settings["OUT_PERIOD"], as_cat=True, broadcast_to=choosers.index ) else: logger.error("Choosers table already has columns 'in_period' and 'out_period'.") diff --git a/activitysim/abm/models/util/mode.py b/activitysim/abm/models/util/mode.py index 8a75ae8b6..3c0d2a5ed 100644 --- a/activitysim/abm/models/util/mode.py +++ b/activitysim/abm/models/util/mode.py @@ -131,8 +131,12 @@ def run_tour_mode_choice_simulate( assert ("in_period" not in choosers) and ("out_period" not in choosers) in_time = skims["in_time_col_name"] out_time = skims["out_time_col_name"] - choosers["in_period"] = network_los.skim_time_period_label(choosers[in_time]) - choosers["out_period"] = network_los.skim_time_period_label(choosers[out_time]) + choosers["in_period"] = network_los.skim_time_period_label( + choosers[in_time], as_cat=True + ) + choosers["out_period"] = network_los.skim_time_period_label( + choosers[out_time], as_cat=True + ) expressions.annotate_preprocessors( state, choosers, locals_dict, skims, model_settings, trace_label diff --git a/activitysim/abm/models/util/vectorize_tour_scheduling.py b/activitysim/abm/models/util/vectorize_tour_scheduling.py index 775d84b7b..297a61e33 100644 --- a/activitysim/abm/models/util/vectorize_tour_scheduling.py +++ b/activitysim/abm/models/util/vectorize_tour_scheduling.py @@ -185,6 +185,12 @@ def dedupe_alt_tdd(state: workflow.State, alt_tdd, tour_purpose, trace_label): logger.info("tdd_alt_segments specified for representative logsums") + if tdd_segments is not None: + # apply categorical dtypes + tdd_segments["time_period"] = tdd_segments["time_period"].astype( + alt_tdd["out_period"].dtype + ) + with chunk.chunk_log( state, tracing.extend_trace_label(trace_label, "dedupe_alt_tdd") ) as chunk_sizer: @@ -328,11 +334,12 @@ def compute_tour_scheduling_logsums( assert "out_period" not in alt_tdd assert "in_period" not in alt_tdd - # FIXME:MEMORY - # These two lines each generate a massive array of strings, - # using a bunch of RAM and slowing things down. - alt_tdd["out_period"] = network_los.skim_time_period_label(alt_tdd["start"]) - alt_tdd["in_period"] = network_los.skim_time_period_label(alt_tdd["end"]) + alt_tdd["out_period"] = network_los.skim_time_period_label( + alt_tdd["start"], as_cat=True + ) + alt_tdd["in_period"] = network_los.skim_time_period_label( + alt_tdd["end"], as_cat=True + ) alt_tdd["duration"] = alt_tdd["end"] - alt_tdd["start"] @@ -383,17 +390,28 @@ def compute_tour_scheduling_logsums( # tracing.log_runtime(model_name=trace_label, start_time=t0) - # redupe - join the alt_tdd_period logsums to alt_tdd to get logsums for alt_tdd - logsums = ( - pd.merge( - alt_tdd.reset_index(), - deduped_alt_tdds.reset_index(), - on=[index_name] + redupe_columns, - how="left", - ) - .set_index(index_name) - .logsums - ) + logsums = pd.Series(data=0, index=alt_tdd.index, dtype=np.float64) + left_on = [alt_tdd.index] + right_on = [deduped_alt_tdds.index] + for i in redupe_columns: + if ( + alt_tdd[i].dtype == "category" + and alt_tdd[i].dtype.ordered + and alt_tdd[i].dtype == deduped_alt_tdds[i].dtype + ): + left_on += [alt_tdd[i].cat.codes] + right_on += [deduped_alt_tdds[i].cat.codes] + else: + left_on += [alt_tdd[i].to_numpy()] + right_on += [deduped_alt_tdds[i].to_numpy()] + + logsums.iloc[:] = pd.merge( + pd.DataFrame(index=alt_tdd.index), + deduped_alt_tdds.logsums, + left_on=left_on, + right_on=right_on, + how="left", + ).logsums.to_numpy() chunk_sizer.log_df(trace_label, "logsums", logsums) del deduped_alt_tdds diff --git a/activitysim/abm/tables/landuse.py b/activitysim/abm/tables/landuse.py index 8d9376b75..9abc0c2e7 100644 --- a/activitysim/abm/tables/landuse.py +++ b/activitysim/abm/tables/landuse.py @@ -23,12 +23,16 @@ def land_use(state: workflow.State): sharrow_enabled = state.settings.sharrow if sharrow_enabled: + err_msg = ( + "a zero-based land_use index is required for sharrow,\n" + "try adding `recode_pipeline_columns: true` to your settings file." + ) # when using sharrow, the land use file must be organized (either in raw # form or via recoding) so that the index is zero-based and contiguous - assert df.index.is_monotonic_increasing - assert df.index[0] == 0 - assert df.index[-1] == len(df.index) - 1 - assert df.index.dtype.kind == "i" + assert df.index.is_monotonic_increasing, err_msg + assert df.index[0] == 0, err_msg + assert df.index[-1] == len(df.index) - 1, err_msg + assert df.index.dtype.kind == "i", err_msg # try to make life easy for everybody by keeping everything in canonical order # but as long as coalesce_pipeline doesn't sort tables it coalesces, it might not stay in order diff --git a/activitysim/core/flow.py b/activitysim/core/flow.py index 6d1e8e257..92429e7d4 100644 --- a/activitysim/core/flow.py +++ b/activitysim/core/flow.py @@ -267,6 +267,7 @@ def skims_mapping( parking_col_name=None, zone_layer=None, primary_origin_col_name=None, + predigitized_time_periods=False, ): logger.info("loading skims_mapping") logger.info(f"- orig_col_name: {orig_col_name}") @@ -337,6 +338,10 @@ def skims_mapping( ), ) else: + if predigitized_time_periods: + time_rel = "_code ->" + else: + time_rel = " @" return dict( # TODO:SHARROW: organize dimensions. odt_skims=skim_dataset, @@ -347,16 +352,16 @@ def skims_mapping( relationships=( f"df._orig_col_name -> odt_skims.{odim}", f"df._dest_col_name -> odt_skims.{ddim}", - "df.out_period @ odt_skims.time_period", + f"df.out_period{time_rel} odt_skims.time_period", f"df._dest_col_name -> dot_skims.{odim}", f"df._orig_col_name -> dot_skims.{ddim}", - "df.in_period @ dot_skims.time_period", + f"df.in_period{time_rel} dot_skims.time_period", f"df._orig_col_name -> odr_skims.{odim}", f"df._dest_col_name -> odr_skims.{ddim}", - "df.in_period @ odr_skims.time_period", + f"df.in_period{time_rel} odr_skims.time_period", f"df._dest_col_name -> dor_skims.{odim}", f"df._orig_col_name -> dor_skims.{ddim}", - "df.out_period @ dor_skims.time_period", + f"df.out_period{time_rel} dor_skims.time_period", f"df._orig_col_name -> od_skims.{odim}", f"df._dest_col_name -> od_skims.{ddim}", ), @@ -525,6 +530,15 @@ def new_flow( cache_dir = state.filesystem.get_sharrow_cache_dir() logger.debug(f"flow.cache_dir: {cache_dir}") + predigitized_time_periods = False + if "out_period" in choosers and "in_period" in choosers: + if ( + choosers["out_period"].dtype == "category" + and choosers["in_period"].dtype == "category" + ): + choosers["out_period_code"] = choosers["out_period"].cat.codes + choosers["in_period_code"] = choosers["in_period"].cat.codes + predigitized_time_periods = True skims_mapping_ = skims_mapping( state, orig_col_name, @@ -534,6 +548,7 @@ def new_flow( parking_col_name=parking_col_name, zone_layer=zone_layer, primary_origin_col_name=primary_origin_col_name, + predigitized_time_periods=predigitized_time_periods, ) if size_term_mapping is None: size_term_mapping = {} @@ -774,6 +789,9 @@ def apply_flow( it ever again, but having a reference to it available later can be useful in debugging and tracing. Flows are cached and reused anyway, so it is generally not important to delete this at any point to free resources. + tree : sharrow.DataTree + The tree data used to compute the flow result. It is seperate from the + flow to prevent it from being cached with the flow. """ if sh is None: return None, None @@ -800,7 +818,7 @@ def apply_flow( logger.error(f"error in apply_flow: {err!s}") if required: raise - return None, None + return None, None, None else: raise with logtime(f"{flow.name}.load", trace_label or ""): @@ -822,7 +840,9 @@ def apply_flow( logger.error(f"error in apply_flow: {err!s}") if required: raise - return None, flow + tree = flow.tree + flow.tree = None + return None, flow, tree raise except Exception as err: logger.error(f"error in apply_flow: {err!s}") @@ -833,4 +853,6 @@ def apply_flow( # Detecting compilation activity when in production mode is a bug # that should be investigated. tracing.timing_notes.add(f"compiled:{flow.name}") - return flow_result, flow + tree = flow.tree + flow.tree = None + return flow_result, flow, tree diff --git a/activitysim/core/interaction_simulate.py b/activitysim/core/interaction_simulate.py index 88dbfc73d..780bd53e7 100644 --- a/activitysim/core/interaction_simulate.py +++ b/activitysim/core/interaction_simulate.py @@ -171,7 +171,7 @@ def replace_in_index_level(mi, level, *repls): timelogger.mark("sharrow preamble", True, logger, trace_label) - sh_util, sh_flow = apply_flow( + sh_util, sh_flow, sh_tree = apply_flow( state, spec_sh, df, @@ -187,10 +187,13 @@ def replace_in_index_level(mi, level, *repls): index=df.index if extra_data is None else None, ) chunk_sizer.log_df(trace_label, "sh_util", None) # hand off to caller + if sharrow_enabled != "test": + # if not testing sharrow, we are done with this object now. + del sh_util timelogger.mark("sharrow flow", True, logger, trace_label) else: - sh_util, sh_flow = None, None + sh_util, sh_flow, sh_tree = None, None, None timelogger.mark("sharrow flow", False) if ( @@ -404,9 +407,9 @@ def to_series(x): if sh_flow is not None and trace_rows is not None and trace_rows.any(): assert type(trace_rows) == np.ndarray sh_utility_fat = sh_flow.load_dataarray( - # sh_flow.tree.replace_datasets( - # df=df.iloc[trace_rows], - # ), + sh_tree.replace_datasets( + df=df.iloc[trace_rows], + ), dtype=np.float32, ) sh_utility_fat = sh_utility_fat[trace_rows, :] diff --git a/activitysim/core/los.py b/activitysim/core/los.py index 9d2136098..d0cf66a3b 100644 --- a/activitysim/core/los.py +++ b/activitysim/core/los.py @@ -845,7 +845,9 @@ def get_tappairs3d(self, otap, dtap, dim3, key): return s.values - def skim_time_period_label(self, time_period, fillna=None): + def skim_time_period_label( + self, time_period, fillna=None, as_cat=False, broadcast_to=None + ): """ convert time period times to skim time period labels (e.g. 9 -> 'AM') @@ -873,6 +875,14 @@ def skim_time_period_label(self, time_period, fillna=None): assert 0 == model_time_window_min % period_minutes total_periods = model_time_window_min / period_minutes + try: + time_label_dtype = self.skim_dicts["taz"].time_label_dtype + except (KeyError, AttributeError): + # if the "taz" skim_dict is missing, or if using old SkimDict + # instead of SkimDataset, this labeling shortcut is unavailable. + time_label_dtype = str + as_cat = False + # FIXME - eventually test and use np version always? if np.isscalar(time_period): bin = ( @@ -888,6 +898,12 @@ def skim_time_period_label(self, time_period, fillna=None): result = self.skim_time_periods["labels"].get(bin, default=default) else: result = self.skim_time_periods["labels"][bin] + if broadcast_to is not None: + result = pd.Series( + data=result, + index=broadcast_to, + dtype=time_label_dtype if as_cat else str, + ) else: result = pd.cut( time_period, @@ -898,8 +914,10 @@ def skim_time_period_label(self, time_period, fillna=None): if fillna is not None: default = self.skim_time_periods["labels"][fillna] result = result.fillna(default) - result = result.astype(str) - + if as_cat: + result = result.astype(time_label_dtype) + else: + result = result.astype(str) return result def get_tazs(self, state): diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 1763d17bf..9dda2a0b2 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -536,7 +536,7 @@ def eval_utilities( locals_dict.update(state.get_global_constants()) if locals_d is not None: locals_dict.update(locals_d) - sh_util, sh_flow = apply_flow( + sh_util, sh_flow, sh_tree = apply_flow( state, spec_sh, choosers, @@ -652,7 +652,7 @@ def eval_utilities( if sh_flow is not None: try: data_sh = sh_flow.load( - sh_flow.tree.replace_datasets( + sh_tree.replace_datasets( df=choosers.iloc[offsets], ), dtype=np.float32, @@ -731,7 +731,7 @@ def eval_utilities( ) print(f"{sh_util.shape=}") print(misses) - _sh_flow_load = sh_flow.load() + _sh_flow_load = sh_flow.load(sh_tree) print("possible problematic expressions:") for expr_n, expr in enumerate(exprs): closeness = np.isclose( diff --git a/activitysim/core/skim_dataset.py b/activitysim/core/skim_dataset.py index 759ecdead..e6528f1ea 100644 --- a/activitysim/core/skim_dataset.py +++ b/activitysim/core/skim_dataset.py @@ -33,6 +33,10 @@ def __init__(self, dataset): self.time_map = { j: i for i, j in enumerate(self.dataset.indexes["time_period"]) } + self.time_label_dtype = pd.api.types.CategoricalDtype( + self.dataset.indexes["time_period"], + ordered=True, + ) self.usage = set() # track keys of skims looked up @property @@ -184,6 +188,10 @@ def __init__(self, dataset, orig_key, dest_key, time_key=None, *, time_map=None) } else: self.time_map = time_map + self.time_label_dtype = pd.api.types.CategoricalDtype( + self.dataset.indexes["time_period"], + ordered=True, + ) @property def odim(self): @@ -246,6 +254,11 @@ def set_df(self, df): ): logger.info(f"natural use for time_period={self.time_key}") positions["time_period"] = df[self.time_key] + elif ( + df[self.time_key].dtype == "category" + and df[self.time_key].dtype == self.time_label_dtype + ): + positions["time_period"] = df[self.time_key].cat.codes else: logger.info(f"vectorize lookup for time_period={self.time_key}") positions["time_period"] = pd.Series( @@ -257,11 +270,18 @@ def set_df(self, df): self.positions = {} for k, v in positions.items(): try: - self.positions[k] = v.astype(int) - except TypeError: - # possibly some missing values that are not relevant, - # fill with zeros to continue. - self.positions[k] = v.fillna(0).astype(int) + is_int = np.issubdtype(v.dtype, np.integer) + except Exception: + is_int = False + if is_int: + self.positions[k] = v + else: + try: + self.positions[k] = v.astype(int) + except TypeError: + # possibly some missing values that are not relevant, + # fill with zeros to continue. + self.positions[k] = v.fillna(0).astype(int) else: self.positions = pd.DataFrame(positions).astype(int) diff --git a/conda-environments/github-actions-tests.yml b/conda-environments/github-actions-tests.yml index e4d44e8d7..c7cfd39e5 100644 --- a/conda-environments/github-actions-tests.yml +++ b/conda-environments/github-actions-tests.yml @@ -21,6 +21,7 @@ dependencies: - platformdirs = 3.2.* - psutil = 5.9.* - pyarrow = 11.* +- pydantic = 1.10.* - pypyr = 5.8.* - pytables >= 3.5.1,<3.7 # orca's constraint - pytest = 7.2.*