Skip to content

Commit

Permalink
Merge pull request #121 from SANDAG/ABM3_develop_skim_mem
Browse files Browse the repository at this point in the history
Load skims into shared memory to be accessed by later models
  • Loading branch information
bhargavasana authored Apr 12, 2024
2 parents 3e7c8ee + 5ea6487 commit 9a829d3
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/asim/configs/airport.CBX/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ inherit_settings: False
# number of households to simulate

households_sample_size: 0
multiprocess: False
multiprocess: True
strict: False
mem_tick: 1
num_processes: 1
Expand Down
4 changes: 2 additions & 2 deletions src/asim/configs/airport.SAN/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ inherit_settings: False
# number of households to simulate

households_sample_size: 0
multiprocess: False
multiprocess: True
strict: False
mem_tick: 1
num_processes: 1
Expand All @@ -19,7 +19,7 @@ data_dir: data

# chooser chunk size in gigabytes
# target top memory usage during activitysim run (including shared memory, loaded tables, and transient memory usage)
chunk_size: 50_000_000_000
chunk_size: 400_000_000_000
#chunk_size: 0

# minimum fraction of total chunk_size to reserve for adaptive chunking
Expand Down
2 changes: 2 additions & 0 deletions src/asim/configs/common/network_los.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ write_skim_cache: False
taz_skims:
- traffic_skims*.omx
- transit_skims*.omx
- dest_pmsa.omx
- dest_poi.omx

maz: land_use.csv # mgra.csv

Expand Down
14 changes: 3 additions & 11 deletions src/asim/configs/common_airport/network_los.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,9 @@ write_skim_cache: False
trace_tvpb_cache_as_csv: False

taz_skims:
- traffic_skims_EA.omx
- traffic_skims_AM.omx
- traffic_skims_MD.omx
- traffic_skims_PM.omx
- traffic_skims_EV.omx
- transit_skims_EA.omx
- transit_skims_AM.omx
- transit_skims_MD.omx
- transit_skims_PM.omx
- transit_skims_EV.omx
# - transit_skims.omx
- traffic_skims*.omx
- transit_skims*.omx
- dest_pmsa.omx
- dest_poi.omx


Expand Down
13 changes: 3 additions & 10 deletions src/asim/configs/crossborder/network_los.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,10 @@ read_skim_cache: False
write_skim_cache: False

taz_skims:
- traffic_skims_EA.omx
- traffic_skims_AM.omx
- traffic_skims_MD.omx
- traffic_skims_PM.omx
- traffic_skims_EV.omx
- transit_skims_ea.omx
- transit_skims_am.omx
- transit_skims_md.omx
- transit_skims_pm.omx
- transit_skims_ev.omx
- traffic_skims*.omx
- transit_skims*.omx
- dest_pmsa.omx
- dest_poi.omx

maz: mazs_xborder.csv # mgra.csv

Expand Down
109 changes: 69 additions & 40 deletions src/main/emme/toolbox/master_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import shutil

import multiprocessing
import signal

_join = os.path.join
_dir = os.path.dirname
Expand Down Expand Up @@ -658,47 +659,75 @@ def __call__(self, main_directory, scenario_id, scenario_title, emmebank_title,
"runSandagAbm_Preprocessing.cmd",
[drive, drive + path_forward_slash, msa_iteration, scenarioYear],
"Creating all the required files to run the ActivitySim models", capture_output=True)
if not skipABMResident[iteration]:
self.set_sample_rate(_join(self._path, r"src\asim\configs\resident\settings_mp.yaml"), int(sample_rate[iteration] * hh_resident_size))
self.run_proc(
"runSandagAbm_ActivitySimResident.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim resident model", capture_output=True)
if not skipABMAirport[iteration]:
hh_airport_size = {}
for airport in ["san", "cbx"]:
householdFile = pd.read_csv(_join(self._path, "input", "households_airport.{}.csv".format(airport)))
hh_airport_size[airport] = len(householdFile)

skip_asim = skipABMResident[iteration] and skipABMAirport[iteration] and skipABMXborder[iteration] and skipABMVisitor[iteration]

if not skip_asim:
mem_manager = _subprocess.Popen(
[_join(self._path, "bin", "manage_skim_mem.cmd"),
drive, drive + path_forward_slash],
stdout=_subprocess.PIPE, stderr=_subprocess.PIPE,
stdin=_subprocess.PIPE, creationflags=_subprocess.CREATE_NEW_PROCESS_GROUP
)
try:
if not skipABMResident[iteration]:
self.set_sample_rate(_join(self._path, r"src\asim\configs\resident\settings_mp.yaml"), int(sample_rate[iteration] * hh_resident_size))
self.run_proc(
"runSandagAbm_ActivitySimResident.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim resident model", capture_output=True)
if not skipABMAirport[iteration]:
hh_airport_size = {}
for airport in ["san", "cbx"]:
householdFile = pd.read_csv(_join(self._path, "input", "households_airport.{}.csv".format(airport)))
hh_airport_size[airport] = len(householdFile)
del(householdFile)
self.set_sample_rate(_join(self._path, r"src\asim\configs\airport.CBX\settings.yaml"), int(sample_rate[iteration] * hh_airport_size["cbx"]))
self.set_sample_rate(_join(self._path, r"src\asim\configs\airport.SAN\settings.yaml"), int(sample_rate[iteration] * hh_airport_size["san"]))
self.run_proc(
"runSandagAbm_ActivitySimAirport.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim airport models", capture_output=True)
if (not skipABMXborderWait) and (iteration == 0):
self.run_proc(
"runSandagAbm_ActivitySimXborderWaitModel.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim wait time models", capture_output=True)
if not skipABMXborder[iteration]:
householdFile = pd.read_csv(_join(self._path, "input", "households_xborder.csv"))
hh_xborder_size = len(householdFile)
del(householdFile)
self.set_sample_rate(_join(self._path, r"src\asim\configs\airport.CBX\settings.yaml"), int(sample_rate[iteration] * hh_airport_size["cbx"]))
self.set_sample_rate(_join(self._path, r"src\asim\configs\airport.SAN\settings.yaml"), int(sample_rate[iteration] * hh_airport_size["san"]))
self.run_proc(
"runSandagAbm_ActivitySimAirport.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim airport models", capture_output=True)
if (not skipABMXborderWait) and (iteration == 0):
self.run_proc(
"runSandagAbm_ActivitySimXborderWaitModel.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim wait time models", capture_output=True)
if not skipABMXborder[iteration]:
householdFile = pd.read_csv(_join(self._path, "input", "households_xborder.csv"))
hh_xborder_size = len(householdFile)
del(householdFile)
self.set_sample_rate(_join(self._path, r"src\asim\configs\crossborder\settings.yaml"), int(sample_rate[iteration] * hh_xborder_size))
self.run_proc(
"runSandagAbm_ActivitySimXborder.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim crossborder model", capture_output=True)
if not skipABMVisitor[iteration]:
householdFile = pd.read_csv(_join(self._path, "input", "households_visitor.csv"))
hh_visitor_size = len(householdFile)
del(householdFile)
self.set_sample_rate(_join(self._path, r"src\asim\configs\visitor\settings.yaml"), int(sample_rate[iteration] * hh_visitor_size))
self.run_proc(
"runSandagAbm_ActivitySimVisitor.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim visitor model", capture_output=True)
self.set_sample_rate(_join(self._path, r"src\asim\configs\crossborder\settings.yaml"), int(sample_rate[iteration] * hh_xborder_size))
self.run_proc(
"runSandagAbm_ActivitySimXborder.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim crossborder model", capture_output=True)
if not skipABMVisitor[iteration]:
householdFile = pd.read_csv(_join(self._path, "input", "households_visitor.csv"))
hh_visitor_size = len(householdFile)
del(householdFile)
self.set_sample_rate(_join(self._path, r"src\asim\configs\visitor\settings.yaml"), int(sample_rate[iteration] * hh_visitor_size))
self.run_proc(
"runSandagAbm_ActivitySimVisitor.cmd",
[drive, drive + path_forward_slash],
"Running ActivitySim visitor model", capture_output=True)
finally:
if not skip_asim:
forced_stop = False
if mem_manager.poll() is None:
mem_manager.stdin.write(b"\n")
_time.sleep(5)
if mem_manager.poll() is None:
mem_manager.send_signal(signal.CTRL_BREAK_EVENT)
forced_stop = True
out, err = mem_manager.communicate()
report = _m.PageBuilder(title="Command report")
self.add_html(report, 'Output:<br><br><div class="preformat">%s</div>' % out)
if err:
self.add_html(report, 'Error message(s):<br><br><div class="preformat">%s</div>' % err)
_m.logbook_write("Skim shared memory manager process record", report.render())
if mem_manager.returncode != 0 and not forced_stop:
raise Exception("Error in skim shared memory manager, view logbook for details")

if not skipMAASModel[iteration]:
self.run_proc("runMtxMgr.cmd", [drive, drive + path_no_drive], "Start matrix manager")
Expand Down
33 changes: 33 additions & 0 deletions src/main/python/skim_shared_mem_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from multiprocessing import shared_memory
import time

# Find and hold on to skims in shared memory so memory is not released

if __name__ == '__main__':
shared_mem_name = "skim_shared_memory__taz"

try:
test_mem = shared_memory.SharedMemory(name=shared_mem_name)
raise Exception("Skims already exist in shared memory")
except FileNotFoundError:
pass

# Try to get skim from shared memory every 10 minutes
# Timeout after one hour
mem = None
for i in range(6):
time.sleep(600)
try:
mem = shared_memory.SharedMemory(name=shared_mem_name)
print("Skims found in shared memory")
break
except FileNotFoundError:
continue
if mem is None:
print("Timed out, skims not found in shared memory")
else:
# Wait for asim models to complete, then release memory
input()
print("Releasing skims from shared memory")
mem.close()
mem.unlink()
32 changes: 32 additions & 0 deletions src/main/resources/manage_skim_mem.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
ECHO ON

set PROJECT_DRIVE=%1
set PROJECT_DIRECTORY=%2

%PROJECT_DRIVE%
cd /d %PROJECT_DIRECTORY%

SET ANACONDA3_DIR=%CONDA_PREFIX%

SET PATH=%ANACONDA3_DIR%\Library\bin;%PATH%
SET PATH=%ANACONDA3_DIR%\Scripts;%ANACONDA3_DIR%\bin;%PATH%

:: setup paths to Python application, Conda script, etc.
SET CONDA3_ACT=%ANACONDA3_DIR%\Scripts\activate.bat

SET CONDA3_DEA=%ANACONDA3_DIR%\Scripts\deactivate.bat

SET CONDA3=%ANACONDA3_DIR%\Scripts\conda.exe

SET PYTHON3=%ANACONDA3_DIR%\envs\asim_baydag\python.exe

ECHO Activate asim_baydag....
CD /d %ANACONDA3_DIR%\Scripts
CALL %CONDA3_ACT% asim_baydag

set MKL_NUM_THREADS=1
set MKL=1

cd /d %PROJECT_DIRECTORY%

%PYTHON3% python/skim_shared_mem_manager.py

0 comments on commit 9a829d3

Please sign in to comment.