|
| 1 | +from collections import Counter |
1 | 2 | from pprint import pformat
|
2 | 3 | from typing import Dict, List
|
3 | 4 |
|
4 |
| -from bw2data import get_node |
5 |
| -from bw2data.backends import Exchange |
| 5 | +from bw2data import get_node, labels |
| 6 | +from bw2data.backends import Exchange, Node |
6 | 7 | from bw2data.backends.schema import ExchangeDataset
|
7 | 8 | from bw2data.errors import UnknownObject
|
8 | 9 | from loguru import logger
|
9 | 10 |
|
| 11 | +from multifunctional.errors import MultipleFunctionalExchangesWithSameInput |
10 | 12 |
|
11 |
| -def allocation_before_writing( |
12 |
| - data: Dict[tuple, dict], strategy_label: str |
13 |
| -) -> Dict[tuple, dict]: |
| 13 | + |
| 14 | +def allocation_before_writing(data: Dict[tuple, dict], strategy_label: str) -> Dict[tuple, dict]: |
14 | 15 | """Utility to perform allocation on datasets and expand `data` with allocated processes."""
|
15 | 16 | from . import allocation_strategies
|
16 | 17 |
|
@@ -62,7 +63,7 @@ def add_exchange_input_if_missing(data: dict) -> dict:
|
62 | 63 |
|
63 | 64 |
|
64 | 65 | def update_datasets_from_allocation_results(data: List[dict]) -> None:
|
65 |
| - """Given data from allocation, create or update datasets as needed from `data`.""" |
| 66 | + """Given data from allocation, create, update, or delete datasets as needed from `data`.""" |
66 | 67 | from .node_classes import ReadOnlyProcessWithReferenceProduct
|
67 | 68 |
|
68 | 69 | for ds in data:
|
@@ -98,3 +99,121 @@ def product_as_process_name(data: List[dict]) -> None:
|
98 | 99 | functional_excs = [exc for exc in ds["exchanges"] if exc.get("functional")]
|
99 | 100 | if len(functional_excs) == 1 and functional_excs[0].get("name"):
|
100 | 101 | ds["name"] = functional_excs[0]["name"]
|
| 102 | + |
| 103 | + |
| 104 | +def set_correct_process_type(dataset: Node) -> Node: |
| 105 | + """ |
| 106 | + Change the `type` for an LCI process under certain conditions. |
| 107 | +
|
| 108 | + Only will make changes if the following conditions are met: |
| 109 | +
|
| 110 | + * `type` is `multifunctional` but the dataset is no longer multifunctional -> |
| 111 | + set to either `process` or `processwithreferenceproduct` |
| 112 | + * `type` is `None` or missing -> set to either `process` or `processwithreferenceproduct` |
| 113 | + * `type` is `process` but the dataset also includes an exchange which points to the same node |
| 114 | + -> `processwithreferenceproduct` |
| 115 | +
|
| 116 | + """ |
| 117 | + if dataset.get("type") not in ( |
| 118 | + labels.chimaera_node_default, |
| 119 | + labels.process_node_default, |
| 120 | + "multifunctional", |
| 121 | + None, |
| 122 | + ): |
| 123 | + pass |
| 124 | + elif dataset.multifunctional: |
| 125 | + dataset["type"] = "multifunctional" |
| 126 | + elif any(exc.input == exc.output for exc in dataset.exchanges()): |
| 127 | + if dataset["type"] == "multifunctional": |
| 128 | + logger.debug( |
| 129 | + "Changed %s (%s) type from `multifunctional` to `%s`", |
| 130 | + dataset.get("name"), |
| 131 | + dataset.id, |
| 132 | + labels.chimaera_node_default, |
| 133 | + ) |
| 134 | + dataset["type"] = labels.chimaera_node_default |
| 135 | + elif any(exc.get("functional") for exc in dataset.exchanges()): |
| 136 | + if dataset["type"] == "multifunctional": |
| 137 | + logger.debug( |
| 138 | + "Changed %s (%s) type from `multifunctional` to `%s`", |
| 139 | + dataset.get("name"), |
| 140 | + dataset.id, |
| 141 | + labels.process_node_default, |
| 142 | + ) |
| 143 | + dataset["type"] = labels.process_node_default |
| 144 | + elif ( |
| 145 | + # No production edges -> implicit self production -> chimaera |
| 146 | + not any( |
| 147 | + exc.get("type") in labels.technosphere_positive_edge_types |
| 148 | + for exc in dataset.exchanges() |
| 149 | + ) |
| 150 | + ): |
| 151 | + dataset["type"] = labels.chimaera_node_default |
| 152 | + elif not dataset.get("type"): |
| 153 | + dataset["type"] = labels.process_node_default |
| 154 | + else: |
| 155 | + # No conditions for setting or changing type occurred |
| 156 | + pass |
| 157 | + |
| 158 | + return dataset |
| 159 | + |
| 160 | + |
| 161 | +def purge_expired_linked_readonly_processes(dataset: Node) -> None: |
| 162 | + from .database import MultifunctionalDatabase |
| 163 | + |
| 164 | + if not dataset.get("mf_was_once_allocated"): |
| 165 | + return |
| 166 | + |
| 167 | + if dataset["type"] == "multifunctional": |
| 168 | + # Can have some readonly allocated processes which refer to non-functional edges |
| 169 | + for ds in MultifunctionalDatabase(dataset["database"]): |
| 170 | + if ( |
| 171 | + ds["type"] in ("readonly_process",) |
| 172 | + and ds.get("mf_parent_key") == dataset.key |
| 173 | + and ds["mf_allocation_run_uuid"] != dataset["mf_allocation_run_uuid"] |
| 174 | + ): |
| 175 | + ds.delete() |
| 176 | + |
| 177 | + for exc in dataset.exchanges(): |
| 178 | + try: |
| 179 | + exc.input |
| 180 | + except UnknownObject: |
| 181 | + exc.input = dataset |
| 182 | + exc.save() |
| 183 | + logger.debug( |
| 184 | + "Edge to deleted readonly process redirected to parent process: %s", |
| 185 | + exc, |
| 186 | + ) |
| 187 | + |
| 188 | + else: |
| 189 | + # Process or chimaera process with one functional edge |
| 190 | + # Make sure that single functional edge is not referring to obsolete readonly process |
| 191 | + functional_edges = [exc for exc in dataset.exchanges() if exc.get("functional")] |
| 192 | + if not len(functional_edges) < 2: |
| 193 | + raise ValueError( |
| 194 | + f"Process marked monofunctional with type {dataset['type']} but has {len(functional_edges)} functional edges" |
| 195 | + ) |
| 196 | + edge = functional_edges[0] |
| 197 | + if edge.input["type"] in ( |
| 198 | + "readonly_process", |
| 199 | + ): # TBD https://github.com/brightway-lca/multifunctional/issues/23 |
| 200 | + # This node should be deleted; have to change to chimaera process with self-input |
| 201 | + logger.debug( |
| 202 | + "Edge to expired readonly process %s redirected to parent process %s", |
| 203 | + edge.input, |
| 204 | + dataset, |
| 205 | + ) |
| 206 | + edge.input = dataset |
| 207 | + edge.save() |
| 208 | + if dataset["type"] != labels.chimaera_node_default: |
| 209 | + logger.debug( |
| 210 | + "Change node type to chimaera: %s (%s)", |
| 211 | + dataset, |
| 212 | + dataset.id, |
| 213 | + ) |
| 214 | + dataset["type"] = labels.chimaera_node_default |
| 215 | + |
| 216 | + # Obsolete readonly processes |
| 217 | + for ds in MultifunctionalDatabase(dataset["database"]): |
| 218 | + if ds["type"] in ("readonly_process",) and ds.get("mf_parent_key") == dataset.key: |
| 219 | + ds.delete() |
0 commit comments