Skip to content

Commit

Permalink
Fix multiprocessing implementation for matrices export.
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi committed Jul 30, 2023
1 parent 6b2251f commit 1217a93
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 12 deletions.
3 changes: 2 additions & 1 deletion premise/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__all__ = ("NewDatabase", "clear_cache", "get_regions_definition")
__all__ = ("NewDatabase", "clear_cache", "get_regions_definition", "PathwaysDataPackage")
__version__ = (1, 5, 9)

from pathlib import Path
Expand All @@ -8,4 +8,5 @@
VARIABLES_DIR = Path(__file__).resolve().parent / "iam_variables_mapping"

from .ecoinvent_modification import NewDatabase
from .pathways import PathwaysDataPackage
from .utils import clear_cache, get_regions_definition
55 changes: 44 additions & 11 deletions premise/ecoinvent_modification.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ def _update_all(
return scenario, modified_datasets


def _export_to_matrices(obj):
obj.export_db_to_matrices()

class NewDatabase:
"""
Class that represents a new wurst inventory database, modified according to IAM data.
Expand Down Expand Up @@ -528,7 +531,7 @@ def __init__(
self.database = self.__find_cached_db(
source_db, keep_uncertainty_data=keep_uncertainty_data
)
# print("Done!")
print("Done!")
else:
self.database = self.__clean_database(
keep_uncertainty_data=keep_uncertainty_data
Expand All @@ -547,7 +550,7 @@ def __init__(
data = self.__import_additional_inventories(self.additional_inventories)
self.database.extend(data)

# print("Done!")
print("Done!")

print("\n/////////////////////// EXTRACTING IAM DATA ////////////////////////")

Expand Down Expand Up @@ -575,7 +578,7 @@ def _fetch_iam_data(scenario):
with Pool(processes=multiprocessing.cpu_count()) as pool:
pool.map(_fetch_iam_data, self.scenarios)

# print("Done!")
print("Done!")

def __find_cached_db(self, db_name: str, keep_uncertainty_data: bool) -> List[dict]:
"""
Expand Down Expand Up @@ -807,6 +810,8 @@ def update_electricity(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_dac(self) -> None:
"""
This method will update the Direct Air Capture (DAC) inventories
Expand All @@ -833,6 +838,8 @@ def update_dac(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_fuels(self) -> None:
"""
This method will update the fuels inventories
Expand All @@ -857,6 +864,8 @@ def update_fuels(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_cement(self) -> None:
"""
This method will update the cement inventories
Expand All @@ -881,6 +890,8 @@ def update_cement(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_steel(self) -> None:
"""
This method will update the steel inventories
Expand All @@ -905,6 +916,8 @@ def update_steel(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_cars(self) -> None:
"""
This method will update the cars inventories
Expand All @@ -930,6 +943,8 @@ def update_cars(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_two_wheelers(self) -> None:
"""
This method will update the two-wheelers inventories
Expand All @@ -955,6 +970,8 @@ def update_two_wheelers(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_trucks(self) -> None:
"""
This method will update the trucks inventories
Expand Down Expand Up @@ -982,6 +999,8 @@ def update_trucks(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_buses(self) -> None:
"""
This method will update the buses inventories
Expand All @@ -1008,6 +1027,8 @@ def update_buses(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_external_scenario(self):
if self.datapackages:
for i, scenario in enumerate(self.scenarios):
Expand Down Expand Up @@ -1052,6 +1073,8 @@ def update_external_scenario(self):
scenario["database"] = external_scenario.database
print(f"Log file of exchanges saved under {DATA_DIR / 'logs'}.")

print("Done!\n")

def update_emissions(self) -> None:
"""
This method will update the hot pollutants emissions
Expand All @@ -1078,6 +1101,8 @@ def update_emissions(self) -> None:
self.scenarios[s] = results[s][0]
self.modified_datasets.update(results[s][1])

print("Done!\n")

def update_all(self) -> None:
"""
Shortcut method to execute all transformation functions.
Expand Down Expand Up @@ -1135,7 +1160,7 @@ def write_superstructure_db_to_brightway(
cache = {}

# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down Expand Up @@ -1212,7 +1237,7 @@ def write_db_to_brightway(self, name: [str, List[str]] = None):
cache = {}

# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down Expand Up @@ -1278,9 +1303,11 @@ def write_db_to_matrices(self, filepath: str = None):

cache = {}



# use multiprocessing to speed up the process
# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand All @@ -1291,12 +1318,18 @@ def write_db_to_matrices(self, filepath: str = None):
)
for scenario in self.scenarios
]
self.scenarios, cache = pool.starmap(_prepare_database, args)
results = pool.starmap(_prepare_database, args)

for s, scenario in enumerate(self.scenarios):
self.scenarios[s] = results[s][0]
cache.update(results[s][1])

with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(scenario, filepath[scen], self.version)
Export(scenario, filepath[scen], self.version)
for scen, scenario in enumerate(self.scenarios)
]
pool.starmap(Export().export_db_to_matrices, args)
pool.map(_export_to_matrices, args)

# generate scenario report
self.generate_scenario_report()
Expand All @@ -1322,7 +1355,7 @@ def write_db_to_simapro(self, filepath: str = None):
cache = {}

# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down Expand Up @@ -1359,7 +1392,7 @@ def write_datapackage(self, name: str = f"datapackage_{date.today()}"):

cache = {}
# use multiprocessing to speed up the process
with Pool(processes=multiprocessing.cpu_count()) as pool:
with ProcessPool(processes=multiprocessing.cpu_count()) as pool:
args = [
(
scenario,
Expand Down
1 change: 1 addition & 0 deletions premise/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ def __init__(
filepath: Union[list[Path], list[Union[Path, Any]]] = None,
version: str = None,
):

self.db = scenario["database"]
self.model = scenario["model"]
self.scenario = scenario["pathway"]
Expand Down
107 changes: 107 additions & 0 deletions premise/pathways.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from typing import List
import copy
from datetime import date
from multiprocessing import Pool as ProcessPool
import multiprocessing
from pathlib import Path
from datapackage import Package

from . import __version__
from .ecoinvent_modification import NewDatabase
from .export import (
Export,
_prepare_database,
build_datapackage,
generate_scenario_factor_file,
generate_superstructure_db,
)


class PathwaysDataPackage:
def __init__(
self,
scenarios: List[dict],
source_version: str = "3.9",
source_type: str = "brightway",
key: bytes = None,
source_db: str = None,
source_file_path: str = None,
additional_inventories: List[dict] = None,
system_model: str = "cutoff",
system_args: dict = None,
external_scenarios: list = None,
gains_scenario="CLE",
use_absolute_efficiency=False,
):
self.scenarios = scenarios
self.source_db = source_db
self.source_version = source_version
self.key = key
self.years = None

self.datapackage = NewDatabase(
scenarios=scenarios,
source_version=source_version,
source_type=source_type,
key=key,
source_db=source_db,
source_file_path=source_file_path,
additional_inventories=additional_inventories,
system_model=system_model,
system_args=system_args,
external_scenarios=external_scenarios,
gains_scenario=gains_scenario,
use_absolute_efficiency=use_absolute_efficiency,
)

def create_datapackage(self, name: str = f"pathways_{date.today()}"):
self.datapackage.update_all()
self.export_datapackage(name)

def export_datapackage(self, name: str):
# create matrices in current directory
self.datapackage.write_db_to_matrices()
self.build_datapackage(name)

def build_datapackage(self, name):
"""
Create and export a scenario datapackage.
"""

package = Package(base_path=Path.cwd())
package.infer("**/*.csv")
package.descriptor["name"] = name
package.descriptor["title"] = name.capitalize()
package.descriptor[
"description"
] = f"Data package generated by premise {__version__}."
package.descriptor["premise version"] = str(__version__)
package.descriptor["scenarios"] = [
{
"name": f"{s['model'].upper()} - {s['pathway']}",
"description": f"Prospective db, "
f"based on {s['model'].upper()}, "
f"pathway {s['pathway']}.",
}
for s in self.scenarios
]
package.descriptor["keywords"] = [
"ecoinvent",
"scenario",
"data package",
"premise",
"pathways"
]
package.descriptor["licenses"] = [
{
"id": "CC0-1.0",
"title": "CC0 1.0",
"url": "https://creativecommons.org/publicdomain/zero/1.0/",
}
]
package.commit()

# save the datapackage
package.save(f"{name}.zip")

print(f"Data package saved at {Path.cwd() / f'{name}.zip'}")

0 comments on commit 1217a93

Please sign in to comment.