Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BayDAG Contribution #14: Increasing Larch Loading for NMTF #780

Merged
merged 2 commits into from
Apr 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion activitysim/estimation/larch/nonmand_tour_freq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from larch import DataFrames, Model
from larch.log import logger_name
from larch.util import Dict
import pickle
from datetime import datetime

from .general import (
apply_coefficients,
Expand All @@ -27,6 +29,7 @@ def interaction_simulate_data(
coefficients_files="{segment_name}/{name}_coefficients_{segment_name}.csv",
chooser_data_files="{segment_name}/{name}_choosers_combined.csv",
alt_values_files="{segment_name}/{name}_interaction_expression_values.csv",
segment_subset=[],
):
edb_directory = edb_directory.format(name=name)

Expand All @@ -46,21 +49,30 @@ def _read_csv(filename, **kwargs):
alt_values = {}

segment_names = [s["NAME"] for s in settings["SPEC_SEGMENTS"]]
if len(segment_subset) > 0:
assert set(segment_subset).issubset(
set(segment_names)
), f"{segment_subset} is not a subset of {segment_names}"
segment_names = segment_subset

for segment_name in segment_names:
print(f"Loading EDB for {segment_name} segment")
coefficients[segment_name] = _read_csv(
coefficients_files.format(name=name, segment_name=segment_name),
index_col="coefficient_name",
comment="#",
)
chooser_data[segment_name] = _read_csv(
chooser_data_files.format(name=name, segment_name=segment_name),
)
alt_values[segment_name] = _read_csv(
alt_values_files.format(name=name, segment_name=segment_name),
comment="#",
)

spec = _read_csv(
spec_file,
comment="#",
)
spec = remove_apostrophes(spec, ["Label"])
# alt_names = list(spec.columns[3:])
Expand Down Expand Up @@ -118,10 +130,80 @@ def unavail(model, x_ca):
return unav


# FIXME move all this to larch/general.py? see ActititySim issue #686
def _read_feather(filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return pd.read_feather(os.path.join(edb_directory, filename), **kwargs)


def _to_feather(df, filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return df.to_feather(os.path.join(edb_directory, filename), **kwargs)


def _read_pickle(filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return pd.read_pickle(os.path.join(edb_directory, filename), **kwargs)


def _to_pickle(df, filename, name, edb_directory, **kwargs):
filename = filename.format(name=name)
return df.to_pickle(os.path.join(edb_directory, filename), **kwargs)


def _file_exists(filename, name, edb_directory):
filename = filename.format(name=name)
return os.path.exists(os.path.join(edb_directory, filename))


def get_x_ca_df(alt_values, name, edb_directory, num_chunks):
def split(a, n):
k, m = divmod(len(a), n)
return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n))

# process x_ca with cv_to_ca with or without chunking
x_ca_pickle_file = "{name}_x_ca.pkl"
if num_chunks == 1:
x_ca = cv_to_ca(alt_values)
elif _file_exists(x_ca_pickle_file, name, edb_directory):
# if pickle file from previous x_ca processing exist, load it to save time
time_start = datetime.now()
x_ca = _read_pickle(x_ca_pickle_file, name, edb_directory)
print(
f"x_ca data loaded from {name}_x_ca.fea - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
else:
time_start = datetime.now()
# calculate num_chunks based on chunking_size (or max number of rows per chunk)
chunking_size = round(len(alt_values) / num_chunks, 3)
print(
f"Using {num_chunks} chunks results in chunk size of {chunking_size} (of {len(alt_values)} total rows)"
)
all_chunk_ids = list(alt_values.index.get_level_values(0).unique())
split_ids = list(split(all_chunk_ids, num_chunks))
x_ca_list = []
for i, chunk_ids in enumerate(split_ids):
alt_values_i = alt_values.loc[chunk_ids]
x_ca_i = cv_to_ca(alt_values_i)
x_ca_list.append(x_ca_i)
print(
f"\rx_ca_i compute done for chunk {i+1}/{num_chunks} - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
x_ca = pd.concat(x_ca_list, axis=0)
# save final x_ca result as pickle file to save time for future data loading
_to_pickle(x_ca, x_ca_pickle_file, name, edb_directory)
print(
f"x_ca compute done - time elapsed {(datetime.now() - time_start).total_seconds()}"
)
return x_ca


def nonmand_tour_freq_model(
edb_directory="output/estimation_data_bundle/{name}/",
return_data=False,
condense_parameters=False,
segment_subset=[],
num_chunks=1,
):
"""
Prepare nonmandatory tour frequency models for estimation.
Expand All @@ -141,10 +223,16 @@ def nonmand_tour_freq_model(
data = interaction_simulate_data(
name="non_mandatory_tour_frequency",
edb_directory=edb_directory,
segment_subset=segment_subset,
)

settings = data.settings
segment_names = [s["NAME"] for s in settings["SPEC_SEGMENTS"]]
if len(segment_subset) > 0:
assert set(segment_subset).issubset(
set(segment_names)
), f"{segment_subset} is not a subset of {segment_names}"
segment_names = segment_subset
if condense_parameters:
data.relabel_coef = link_same_value_coefficients(
segment_names, data.coefficients, data.spec
Expand All @@ -157,6 +245,7 @@ def nonmand_tour_freq_model(

m = {}
for segment_name in segment_names:
print(f"Creating larch model for {segment_name}")
segment_model = m[segment_name] = Model()
# One of the alternatives is coded as 0, so
# we need to explicitly initialize the MNL nesting graph
Expand All @@ -178,7 +267,15 @@ def nonmand_tour_freq_model(
.set_index("person_id")
.rename(columns={"TAZ": "HOMETAZ"})
)
x_ca = cv_to_ca(alt_values[segment_name].set_index(["person_id", "variable"]))
print("\t performing cv to ca step")
# x_ca = cv_to_ca(alt_values[segment_name].set_index(["person_id", "variable"]))
x_ca = get_x_ca_df(
alt_values=alt_values[segment_name].set_index(["person_id", "variable"]),
name=segment_name,
edb_directory=edb_directory.format(name="non_mandatory_tour_frequency"),
num_chunks=num_chunks,
)

d = DataFrames(
co=x_co,
ca=x_ca,
Expand Down
Loading