Skip to content

Commit

Permalink
refactor: wrap yaml, dto and core in domain classes
Browse files Browse the repository at this point in the history
Init work to remove dtos by wrapping dto and mapping in (yaml) domain
classes.
  • Loading branch information
jsolaas committed Oct 3, 2024
1 parent 328ce12 commit 5ac64ad
Show file tree
Hide file tree
Showing 51 changed files with 1,867 additions and 695 deletions.
7 changes: 2 additions & 5 deletions src/ecalc_cli/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,8 @@ def run(

energy_calculator = EnergyCalculator(graph=model.get_graph())
precision = 6
consumer_results = energy_calculator.evaluate_energy_usage(model.variables)
emission_results = energy_calculator.evaluate_emissions(
variables_map=model.variables,
consumer_results=consumer_results,
)
consumer_results = energy_calculator.evaluate_energy_usage()
emission_results = energy_calculator.evaluate_emissions()
results_core = GraphResult(
graph=model.get_graph(),
consumer_results=consumer_results,
Expand Down
Empty file.
236 changes: 39 additions & 197 deletions src/libecalc/application/energy_calculator.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,18 @@
from collections import defaultdict
from datetime import datetime
from functools import reduce
from typing import Dict

import numpy as np

import libecalc.dto.components
from libecalc.common.consumption_type import ConsumptionType
from libecalc.common.list.list_utils import elementwise_sum
from libecalc.common.math.numbers import Numbers
from libecalc.common.priorities import PriorityID
from libecalc.common.priority_optimizer import PriorityOptimizer
from libecalc.common.temporal_model import TemporalModel
from libecalc.common.units import Unit
from libecalc.common.utils.rates import TimeSeriesInt, TimeSeriesString
from libecalc.common.variables import VariablesMap
from libecalc.core.consumers.consumer_system import ConsumerSystem
from libecalc.core.consumers.factory import create_consumer
from libecalc.core.consumers.generator_set import Genset
from libecalc.core.consumers.legacy_consumer.component import Consumer
from libecalc.core.consumers.legacy_consumer.consumer_function_mapper import EnergyModelMapper
from libecalc.core.models.fuel import FuelModel
from libecalc.core.models.generator import GeneratorModelSampled
from libecalc.core.result import ComponentResult, EcalcModelResult
from libecalc.common.utils.rates import TimeSeriesStreamDayRate
from libecalc.core.result import EcalcModelResult
from libecalc.core.result.emission import EmissionResult
from libecalc.dto.component_graph import ComponentGraph
from libecalc.dto.components import (
ConsumerSystem as ConsumerSystemDTO,
)
from libecalc.dto.components import (
ElectricityConsumer as ElectricityConsumerDTO,
)
from libecalc.dto.components import (
FuelConsumer as FuelConsumerDTO,
)
from libecalc.dto.components import (
GeneratorSet as GeneratorSetDTO,
from libecalc.dto.component_graph import (
Component,
ComponentGraph,
Emitter,
FuelConsumer,
PowerConsumer,
PowerProvider,
ProcessGraph,
)
from libecalc.presentation.yaml.yaml_types.emitters.yaml_venting_emitter import (
YamlDirectTypeEmitter,
YamlOilTypeEmitter,
)


def merge_results(results_per_timestep: Dict[datetime, EcalcModelResult]) -> EcalcModelResult:
return reduce(lambda acc, x: acc.merge(x), results_per_timestep.values())


class EnergyCalculator:
Expand All @@ -54,181 +22,55 @@ def __init__(
):
self._graph = graph

def evaluate_energy_usage(self, variables_map: VariablesMap) -> Dict[str, EcalcModelResult]:
def evaluate_energy_usage(self) -> Dict[str, EcalcModelResult]:
component_ids = list(reversed(self._graph.sorted_node_ids))
component_dtos = [self._graph.get_node(component_id) for component_id in component_ids]
components_to_evaluate = [self._graph.get_node(component_id) for component_id in component_ids]

consumer_results: Dict[str, EcalcModelResult] = {}
power_requirements: Dict[str, TimeSeriesStreamDayRate] = {}

for component_dto in component_dtos:
if isinstance(component_dto, (ElectricityConsumerDTO, FuelConsumerDTO)):
consumer = Consumer(
id=component_dto.id,
name=component_dto.name,
component_type=component_dto.component_type,
regularity=TemporalModel(component_dto.regularity),
consumes=component_dto.consumes,
energy_usage_model=TemporalModel(
{
start_time: EnergyModelMapper.from_dto_to_domain(model)
for start_time, model in component_dto.energy_usage_model.items()
}
),
)
consumer_results[component_dto.id] = consumer.evaluate(expression_evaluator=variables_map)
elif isinstance(component_dto, GeneratorSetDTO):
fuel_consumer = Genset(
id=component_dto.id,
name=component_dto.name,
temporal_generator_set_model=TemporalModel(
{
start_time: GeneratorModelSampled(
fuel_values=model.fuel_values,
power_values=model.power_values,
energy_usage_adjustment_constant=model.energy_usage_adjustment_constant,
energy_usage_adjustment_factor=model.energy_usage_adjustment_factor,
)
for start_time, model in component_dto.generator_set_model.items()
}
),
)
for component in components_to_evaluate:
if isinstance(component, PowerConsumer):
power_requirements[component.id] = component.get_power_requirement()

power_requirement = elementwise_sum(
*[
consumer_results[consumer_id].component_result.power.values
for consumer_id in self._graph.get_successors(component_dto.id)
],
timesteps=variables_map.time_vector,
)
elif isinstance(component, PowerProvider):
power_consumers_for_provider = self._graph.get_successors(component.id)
power_requirements_for_provider = [
power_requirements[power_consumer_id] for power_consumer_id in power_consumers_for_provider
]
component.provide_power(power_requirements_for_provider)

consumer_results[component_dto.id] = EcalcModelResult(
component_result=fuel_consumer.evaluate(
expression_evaluator=variables_map,
power_requirement=power_requirement,
),
models=[],
sub_components=[],
)
elif isinstance(component_dto, libecalc.dto.components.ConsumerSystem):
evaluated_stream_conditions = component_dto.evaluate_stream_conditions(
expression_evaluator=variables_map,
)
optimizer = PriorityOptimizer()
elif isinstance(component, FuelConsumer):
component.get_fuel_usage()

results_per_timestep: Dict[str, Dict[datetime, ComponentResult]] = defaultdict(dict)
priorities_used = TimeSeriesString(
timesteps=[],
values=[],
unit=Unit.NONE,
)
for timestep in variables_map.time_vector:
consumers_for_timestep = [
create_consumer(
consumer=consumer,
timestep=timestep,
)
for consumer in component_dto.consumers
]
elif isinstance(component, ProcessGraph):
component.evaluate()

consumer_system = ConsumerSystem(
id=component_dto.id,
consumers=consumers_for_timestep,
component_conditions=component_dto.component_conditions,
)

def evaluator(priority: PriorityID):
stream_conditions_for_priority = evaluated_stream_conditions[priority]
stream_conditions_for_timestep = {
component_id: [
stream_condition.for_timestep(timestep) for stream_condition in stream_conditions
]
for component_id, stream_conditions in stream_conditions_for_priority.items()
}
return consumer_system.evaluate_consumers(stream_conditions_for_timestep)

optimizer_result = optimizer.optimize(
priorities=list(evaluated_stream_conditions.keys()),
evaluator=evaluator,
)
priorities_used.append(timestep=timestep, value=optimizer_result.priority_used)
for consumer_result in optimizer_result.priority_results:
results_per_timestep[consumer_result.id][timestep] = consumer_result

# merge consumer results
consumer_ids = [consumer.id for consumer in component_dto.consumers]
merged_consumer_results = []
for consumer_id in consumer_ids:
first_result, *rest_results = list(results_per_timestep[consumer_id].values())
merged_consumer_results.append(first_result.merge(*rest_results))

# Convert to legacy compatible operational_settings_used
priorities_to_int_map = {
priority_name: index + 1 for index, priority_name in enumerate(evaluated_stream_conditions.keys())
}
operational_settings_used = TimeSeriesInt(
timesteps=priorities_used.timesteps,
values=[priorities_to_int_map[priority_name] for priority_name in priorities_used.values],
unit=priorities_used.unit,
)

system_result = ConsumerSystem.get_system_result(
id=component_dto.id,
consumer_results=merged_consumer_results,
operational_settings_used=operational_settings_used,
)
consumer_results[component_dto.id] = system_result
for consumer_result in merged_consumer_results:
consumer_results[consumer_result.id] = EcalcModelResult(
component_result=consumer_result,
sub_components=[],
models=[],
)
for component in components_to_evaluate:
if isinstance(component, ProcessGraph):
for sub_component in component.get_components():
consumer_results[sub_component.id] = sub_component.get_ecalc_model_result()
elif isinstance(component, Component):
consumer_results[component.id] = component.get_ecalc_model_result()

return Numbers.format_results_to_precision(consumer_results, precision=6)

def evaluate_emissions(
self, variables_map: VariablesMap, consumer_results: Dict[str, EcalcModelResult]
self,
) -> Dict[str, Dict[str, EmissionResult]]:
"""
Calculate emissions for fuel consumers and emitters
Args:
variables_map:
consumer_results:
Returns: a mapping from consumer_id to emissions
"""
emission_results: Dict[str, Dict[str, EmissionResult]] = {}
for consumer_dto in self._graph.nodes.values():
if isinstance(consumer_dto, (FuelConsumerDTO, GeneratorSetDTO)):
fuel_model = FuelModel(consumer_dto.fuel)
energy_usage = consumer_results[consumer_dto.id].component_result.energy_usage
emission_results[consumer_dto.id] = fuel_model.evaluate_emissions(
expression_evaluator=variables_map,
fuel_rate=np.asarray(energy_usage.values),
)
elif isinstance(consumer_dto, ConsumerSystemDTO):
if consumer_dto.consumes == ConsumptionType.FUEL:
fuel_model = FuelModel(consumer_dto.fuel)
energy_usage = consumer_results[consumer_dto.id].component_result.energy_usage
emission_results[consumer_dto.id] = fuel_model.evaluate_emissions(
expression_evaluator=variables_map, fuel_rate=np.asarray(energy_usage.values)
)
elif isinstance(consumer_dto, (YamlDirectTypeEmitter, YamlOilTypeEmitter)):
installation_id = self._graph.get_parent_installation_id(consumer_dto.id)
installation = self._graph.get_node(installation_id)
if isinstance(consumer_dto, Emitter):
emission_results[consumer_dto.id] = consumer_dto.get_emissions()

venting_emitter_results = {}
emission_rates = consumer_dto.get_emissions(
expression_evaluator=variables_map, regularity=installation.regularity
)
elif isinstance(consumer_dto, ProcessGraph):
for sub_component in consumer_dto.get_components():
if isinstance(sub_component, Emitter):
emission_results[sub_component.id] = sub_component.get_emissions()

for emission_name, emission_rate in emission_rates.items():
emission_result = EmissionResult(
name=emission_name,
timesteps=variables_map.time_vector,
rate=emission_rate,
)
venting_emitter_results[emission_name] = emission_result
emission_results[consumer_dto.id] = venting_emitter_results
return Numbers.format_results_to_precision(emission_results, precision=6)
2 changes: 1 addition & 1 deletion src/libecalc/application/graph_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
self.variables_map = variables_map

def get_subgraph(self, node_id: str) -> "GraphResult":
subgraph = self.graph.get_node(node_id).get_graph()
subgraph = self.graph.get_subgraph(node_id)

return GraphResult(
graph=subgraph,
Expand Down
7 changes: 0 additions & 7 deletions src/libecalc/common/errors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ def __init__(self, message: str):
super().__init__("Illegal state", message, error_type=EcalcErrorType.SERVER_ERROR)


class InvalidReferenceException(EcalcError):
"""The data provided is missing a required reference."""

def __init__(self, message: str):
super().__init__("Invalid reference", message, error_type=EcalcErrorType.CLIENT_ERROR)


class InvalidDateException(EcalcError): ...


Expand Down
21 changes: 18 additions & 3 deletions src/libecalc/common/graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import abc
from typing import Dict, Generic, List, Protocol, TypeVar

import networkx as nx
Expand All @@ -10,16 +11,17 @@

class NodeWithID(Protocol):
@property
@abc.abstractmethod
def id(self) -> NodeID: ...


TNode = TypeVar("TNode", bound=NodeWithID)


class Graph(Generic[TNode]):
def __init__(self):
self.graph = nx.DiGraph()
self.nodes: Dict[NodeID, TNode] = {}
def __init__(self, graph: nx.DiGraph = None, nodes: Dict[NodeID, TNode] = None):
self.graph = nx.DiGraph() if graph is None else graph
self.nodes: Dict[NodeID, TNode] = {} if nodes is None else nodes

def add_node(self, node: TNode) -> Self:
self.graph.add_node(node.id)
Expand All @@ -38,6 +40,19 @@ def add_subgraph(self, subgraph: Graph) -> Self:
self.graph = nx.compose(self.graph, subgraph.graph)
return self

def get_subgraph(self, node_id: NodeID) -> Self:
node_ids = {node_id, *self.get_successors(node_id, recursively=True)}
sub_graph = self.__class__()
for node in self.nodes.values():
if node.id in node_ids:
sub_graph.add_node(node)

for from_id, to_id in self.graph.edges:
if from_id in node_ids and to_id in node_ids:
sub_graph.add_edge(from_id, to_id)

return sub_graph

def get_successors(self, node_id: NodeID, recursively=False) -> List[NodeID]:
if recursively:
return [
Expand Down
3 changes: 3 additions & 0 deletions src/libecalc/common/time_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def __contains__(self, time: datetime) -> bool:
"""
return self.start <= time < self.end

def __eq__(self, other):
return self.start == other.start and self.end == other.end

@staticmethod
def intersects(first: Period, second: Period) -> bool:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/libecalc/common/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def evaluate(self, expression: Union[Expression, Dict[datetime, Expression], Tem
if isinstance(expression, Expression):
return expression.evaluate(variables=self.variables, fill_length=len(self.get_time_vector()))
elif isinstance(expression, dict):
return self._evaluate_temporal(temporal_expression=TemporalModel[expression])
return self._evaluate_temporal(temporal_expression=TemporalModel(expression))
elif isinstance(expression, TemporalModel):
return self._evaluate_temporal(temporal_expression=expression)

Expand Down
8 changes: 3 additions & 5 deletions src/libecalc/core/consumers/base/component.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from abc import ABC, abstractmethod
from typing import List

from libecalc.common.stream_conditions import TimeSeriesStreamConditions
from libecalc.common.utils.rates import TimeSeriesFloat
from libecalc.common.variables import ExpressionEvaluator
from libecalc.core.result import EcalcModelResult
from libecalc.domain.stream_conditions import StreamConditions


class BaseConsumer(ABC):
Expand All @@ -20,9 +20,7 @@ class BaseConsumerWithoutOperationalSettings(ABC):
id: str

@abstractmethod
def get_max_rate(
self, inlet_stream: TimeSeriesStreamConditions, target_pressure: TimeSeriesFloat
) -> List[float]: ...
def get_max_rate(self, inlet_stream: StreamConditions, target_pressure: TimeSeriesFloat) -> List[float]: ...

@abstractmethod
def evaluate(self, **kwargs) -> EcalcModelResult: ...
def evaluate(self, streams: List[StreamConditions]) -> EcalcModelResult: ...
Loading

0 comments on commit 5ac64ad

Please sign in to comment.