diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 3a6c3ae47..d5e1addeb 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -3,6 +3,8 @@ from pathlib import Path from typing import Any, Literal +from pydantic import validator + from activitysim.core.configuration.base import PydanticBase, Union @@ -119,6 +121,11 @@ class OutputTables(PydanticBase): h5_store: bool = False """Write tables into a single HDF5 store instead of individual CSVs.""" + file_type: Literal["csv", "parquet", "h5"] = "csv" + """ + Specifies the file type for output tables. Options are limited to 'csv', + 'h5' or 'parquet'. Only applied if h5_store is set to False.""" + action: str """Whether to 'include' or 'skip' the enumerated tables in `tables`.""" diff --git a/activitysim/core/steps/output.py b/activitysim/core/steps/output.py index 306f2cb44..97f50a57f 100644 --- a/activitysim/core/steps/output.py +++ b/activitysim/core/steps/output.py @@ -9,6 +9,7 @@ import pandas as pd import pyarrow as pa import pyarrow.csv as csv +import pyarrow.parquet as parquet from activitysim.core import configuration, workflow from activitysim.core.workflow.checkpoint import CHECKPOINT_NAME @@ -226,8 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None: @workflow.step def write_tables(state: workflow.State) -> None: """ - Write pipeline tables as csv files (in output directory) as specified by output_tables list - in settings file. + Write pipeline tables as csv or parquet files (in output directory) as specified + by output_tables list in settings file. Output to parquet or a single h5 file is + also supported. + + 'h5_store' defaults to False, which means the output will be written out to csv. + 'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'. + When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5. 'output_tables' can specify either a list of output tables to include or to skip if no output_tables list is specified, then all checkpointed tables will be written @@ -261,6 +267,16 @@ def write_tables(state: workflow.State) -> None: tables: - households + To write tables to parquet files, use the file_type setting: + + :: + + output_tables: + file_type: parquet + action: include + tables: + - households + Parameters ---------- output_dir: str @@ -277,6 +293,7 @@ def write_tables(state: workflow.State) -> None: tables = output_tables_settings.tables prefix = output_tables_settings.prefix h5_store = output_tables_settings.h5_store + file_type = output_tables_settings.file_type sort = output_tables_settings.sort registered_tables = state.registered_tables() @@ -388,14 +405,20 @@ def map_func(x): ): dt = dt.drop([f"_original_{lookup_col}"]) - if h5_store: + if h5_store or file_type == "h5": file_path = state.get_output_file_path("%soutput_tables.h5" % prefix) dt.to_pandas().to_hdf( str(file_path), key=table_name, mode="a", format="fixed" ) + else: - file_name = f"{prefix}{table_name}.csv" + file_name = f"{prefix}{table_name}.{file_type}" file_path = state.get_output_file_path(file_name) # include the index if it has a name or is a MultiIndex - csv.write_csv(dt, file_path) + if file_type == "csv": + csv.write_csv(dt, file_path) + elif file_type == "parquet": + parquet.write_table(dt, file_path) + else: + raise ValueError(f"unknown file_type {file_type}")