Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/materials' into materials
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi committed Jul 15, 2023
2 parents e2d3de7 + 0d07b40 commit bd71d9e
Showing 1 changed file with 22 additions and 34 deletions.
56 changes: 22 additions & 34 deletions premise/metals.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import uuid
from pathlib import Path

import country_converter as coco
import numpy as np
import pandas as pd
import wurst
import yaml
import country_converter as coco

from .export import biosphere_flows_dictionary
from .transformation import (
Expand Down Expand Up @@ -53,6 +53,7 @@ def load_BGS_mapping():
df = pd.read_excel(filepath, sheet_name="Sheet1")
return df


def get_ecoinvent_metal_factors():
"""
Load dataframe with ecoinvent factors for metals
Expand Down Expand Up @@ -84,6 +85,7 @@ def load_post_allocation_correction_factors():
factors = yaml.safe_load(stream)
return factors


def fetch_mapping(filepath: str) -> dict:
"""Returns a dictionary from a YML file"""

Expand Down Expand Up @@ -307,7 +309,6 @@ def update_metal_use(

return dataset


def post_allocation_correction(self):
"""
Correct for post-allocation in the database.
Expand All @@ -329,21 +330,24 @@ def post_allocation_correction(self):
"amount": dataset["additional flow"]["amount"],
"unit": dataset["additional flow"]["unit"],
"type": "biosphere",
"categories": tuple(dataset["additional flow"]["categories"].split("::")),
"categories": tuple(
dataset["additional flow"]["categories"].split("::")
),
"input": (
"biosphere3",
self.biosphere_flow_codes[
dataset["additional flow"]["name"],
dataset["additional flow"]["categories"].split("::")[0],
dataset["additional flow"]["categories"].split("::")[1],
dataset["additional flow"]["unit"],
]
],
),
}
)

def create_new_mining_activity(self, name, reference_product, location, new_location):

def create_new_mining_activity(
self, name, reference_product, location, new_location
):
try:
ws.get_one(
self.database,
Expand All @@ -355,7 +359,6 @@ def create_new_mining_activity(self, name, reference_product, location, new_loca
return None

except ws.NoResults:

try:
original = ws.get_one(
self.database,
Expand All @@ -364,10 +367,14 @@ def create_new_mining_activity(self, name, reference_product, location, new_loca
ws.equals("location", location),
)
except ws.NoResults:
print(f"No original dataset found for {name}, {reference_product} in {location}")
print(
f"No original dataset found for {name}, {reference_product} in {location}"
)
return None
except ws.MultipleResults:
print(f"Multiple original datasets found for {name}, {reference_product} in {location}")
print(
f"Multiple original datasets found for {name}, {reference_product} in {location}"
)
return None

dataset = wurst.copy_to_new_location(original, new_location)
Expand All @@ -385,7 +392,6 @@ def convert_long_to_short_country_name(self, country_long):
country_short = coco.convert(country_long, to="ISO2")

if isinstance(country_short, list):

if country_long == "France (French Guiana)":
country_short = "GF"
else:
Expand All @@ -399,7 +405,6 @@ def convert_long_to_short_country_name(self, country_long):
return country_short

def create_region_specific_market(self, row):

new_location = self.convert_long_to_short_country_name(row["Country"])

if new_location is None:
Expand Down Expand Up @@ -431,10 +436,7 @@ def create_region_specific_market(self, row):

# if not, we create it
dataset = self.create_new_mining_activity(
name,
reference_product,
location,
new_location
name, reference_product, location, new_location
)

if dataset is None:
Expand All @@ -443,13 +445,7 @@ def create_region_specific_market(self, row):
# add new dataset to database
self.database.append(dataset)

self.modified_datasets[
(
self.model,
self.scenario,
self.year
)
][
self.modified_datasets[(self.model, self.scenario, self.year)][
"created"
].append(
(
Expand All @@ -471,11 +467,10 @@ def create_region_specific_market(self, row):
}

def create_market(self, metal, df):

# check if market already exists
# if so, remove it
for ds in self.database:
if ds['name'] == f"market for {metal.lower()}":
if ds["name"] == f"market for {metal.lower()}":
self.database.remove(ds)
# add it to self.modified_datasets
self.modified_datasets[(self.model, self.scenario, self.year)][
Expand Down Expand Up @@ -511,10 +506,7 @@ def create_market(self, metal, df):
}

dataset["exchanges"].extend(
[
self.create_region_specific_market(row)
for _, row in df.iterrows()
]
[self.create_region_specific_market(row) for _, row in df.iterrows()]
)

# filter out None
Expand All @@ -523,7 +515,6 @@ def create_market(self, metal, df):
return dataset

def create_metal_markets(self):

self.post_allocation_correction()

print("Creating metal markets")
Expand Down Expand Up @@ -553,8 +544,7 @@ def create_metal_markets(self):
)

dataframe_parent = dataframe.loc[
(dataframe["Share_2017_2021"].isnull())
& (~dataframe["Region"].isnull())
(dataframe["Share_2017_2021"].isnull()) & (~dataframe["Region"].isnull())
]

print("Creating additional mining processes")
Expand All @@ -563,7 +553,7 @@ def create_metal_markets(self):
row["Process"],
row["Reference product"],
row["Region"],
self.convert_long_to_short_country_name(row["Country"])
self.convert_long_to_short_country_name(row["Country"]),
)

if dataset is None:
Expand All @@ -581,10 +571,8 @@ def create_metal_markets(self):
dataset["location"],
dataset["unit"],
)

)


def write_log(self, dataset, status="created"):
"""
Write log file.
Expand Down

0 comments on commit bd71d9e

Please sign in to comment.