Skip to content

Commit

Permalink
Fix multiprocessing implementation for matrices export.
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi committed Jul 30, 2023
1 parent 6b2251f commit 41bb114
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
55 changes: 44 additions & 11 deletions premise/ecoinvent_modification.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ def _update_all(
return scenario, modified_datasets


def _export_to_matrices(obj):
obj.export_db_to_matrices()

class NewDatabase:
"""
Class that represents a new wurst inventory database, modified according to IAM data.
Expand Down Expand Up @@ -528,7 +531,7 @@ def __init__(
self.database = self.__find_cached_db(
source_db, keep_uncertainty_data=keep_uncertainty_data
)
# print("Done!")
print("Done!")
else:
self.database = self.__clean_database(
keep_uncertainty_data=keep_uncertainty_data
Expand All @@ -547,7 +550,7 @@ def __init__(
data = self.__import_additional_inventories(self.additional_inventories)
self.database.extend(data)

# print("Done!")
print("Done!")

print("\n/////////////////////// EXTRACTING IAM DATA ////////////////////////")

Expand Down Expand Up @@ -575,7 +578,7 @@ def _fetch_iam_data(scenario):
with Pool(processes=multiprocessing.cpu_count()) as pool:
pool.map(_fetch_iam_data, self.scenarios)

# print("Done!")
print("Done!")

def __find_cached_db(self, db_name: str, keep_uncertainty_data: bool) -> List[dict]:
"""
Expand Down Expand Up @@ -807,6 +810,8 @@ def update_electricity(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_dac(self) -> None:
"""
This method will update the Direct Air Capture (DAC) inventories
Expand All @@ -833,6 +838,8 @@ def update_dac(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_fuels(self) -> None:
"""
This method will update the fuels inventories
Expand All @@ -857,6 +864,8 @@ def update_fuels(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_cement(self) -> None:
"""
This method will update the cement inventories
Expand All @@ -881,6 +890,8 @@ def update_cement(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_steel(self) -> None:
"""
This method will update the steel inventories
Expand All @@ -905,6 +916,8 @@ def update_steel(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_cars(self) -> None:
"""
This method will update the cars inventories
Expand All @@ -930,6 +943,8 @@ def update_cars(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_two_wheelers(self) -> None:
"""
This method will update the two-wheelers inventories
Expand All @@ -955,6 +970,8 @@ def update_two_wheelers(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_trucks(self) -> None:
"""
This method will update the trucks inventories
Expand Down Expand Up @@ -982,6 +999,8 @@ def update_trucks(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_buses(self) -> None:
"""
This method will update the buses inventories
Expand All @@ -1008,6 +1027,8 @@ def update_buses(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_external_scenario(self):
if self.datapackages:
for i, scenario in enumerate(self.scenarios):
Expand Down Expand Up @@ -1052,6 +1073,8 @@ def update_external_scenario(self):
scenario["database"] = external_scenario.database
print(f"Log file of exchanges saved under {DATA_DIR / 'logs'}.")

print("Done!\n")

def update_emissions(self) -> None:
"""
This method will update the hot pollutants emissions
Expand All @@ -1078,6 +1101,8 @@ def update_emissions(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_all(self) -> None:
"""
Shortcut method to execute all transformation functions.
Expand Down Expand Up @@ -1135,7 +1160,7 @@ def write_superstructure_db_to_brightway(
cache = {}

# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down Expand Up @@ -1212,7 +1237,7 @@ def write_db_to_brightway(self, name: [str, List[str]] = None):
cache = {}

# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down Expand Up @@ -1278,9 +1303,11 @@ def write_db_to_matrices(self, filepath: str = None):

cache = {}



# use multiprocessing to speed up the process
# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand All @@ -1291,12 +1318,18 @@ def write_db_to_matrices(self, filepath: str = None):
)
for scenario in self.scenarios
]
self.scenarios, cache = pool.starmap(_prepare_database, args)
results = pool.starmap(_prepare_database, args)

for s, scenario in enumerate(self.scenarios):
self.scenarios[s] = results[s][0]
cache.update(results[s][1])

with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(scenario, filepath[scen], self.version)
Export(scenario, filepath[scen], self.version)
for scen, scenario in enumerate(self.scenarios)
]
pool.starmap(Export().export_db_to_matrices, args)
pool.map(_export_to_matrices, args)

# generate scenario report
self.generate_scenario_report()
Expand All @@ -1322,7 +1355,7 @@ def write_db_to_simapro(self, filepath: str = None):
cache = {}

# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down Expand Up @@ -1359,7 +1392,7 @@ def write_datapackage(self, name: str = f"datapackage_{date.today()}"):

cache = {}
# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down
1 change: 1 addition & 0 deletions premise/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ def __init__(
filepath: Union[list[Path], list[Union[Path, Any]]] = None,
version: str = None,
):

self.db = scenario["database"]
self.model = scenario["model"]
self.scenario = scenario["pathway"]
Expand Down

0 comments on commit 41bb114

Please sign in to comment.