From 08656f30f474c5fd02f16044b30524def77f4613 Mon Sep 17 00:00:00 2001 From: LKuemmel Date: Wed, 23 Oct 2024 08:26:23 +0200 Subject: [PATCH] restructure ev --- .../algorithm/additional_current_test.py | 3 +- packages/control/algorithm/common_test.py | 2 +- .../algorithm/filter_chargepoints_test.py | 3 +- .../algorithm/integration_test/conftest.py | 2 +- .../algorithm/surplus_controlled_test.py | 3 +- packages/control/auto_phase_switch_test.py | 2 +- packages/control/chargepoint/chargepoint.py | 2 +- .../control/chargepoint/chargepoint_data.py | 2 +- .../chargepoint/chargepoint_template.py | 2 +- .../control/chargepoint/chargepoint_test.py | 2 +- .../control/chargepoint/get_phases_test.py | 2 +- packages/control/counter.py | 2 +- packages/control/counter_test.py | 3 +- packages/control/data.py | 4 +- packages/control/ev.py | 985 ------------------ packages/control/ev/charge_template.py | 478 +++++++++ .../charge_template_test.py} | 12 +- packages/control/ev/ev.py | 469 +++++++++ packages/control/ev/ev_template.py | 36 + packages/control/{ => ev}/ev_test.py | 2 +- packages/control/ocpp_test.py | 2 +- packages/control/phase_switch.py | 2 +- packages/helpermodules/command.py | 3 +- .../data_migration/data_migration.py | 3 +- .../measurement_logging/update_yields.py | 4 +- packages/helpermodules/subdata.py | 23 +- packages/helpermodules/timecheck_test.py | 2 +- packages/helpermodules/update_config.py | 22 +- packages/modules/update_soc.py | 2 +- packages/modules/update_soc_test.py | 7 +- 30 files changed, 1050 insertions(+), 1036 deletions(-) delete mode 100644 packages/control/ev.py create mode 100644 packages/control/ev/charge_template.py rename packages/control/{ev_charge_template_test.py => ev/charge_template_test.py} (96%) create mode 100644 packages/control/ev/ev.py create mode 100644 packages/control/ev/ev_template.py rename packages/control/{ => ev}/ev_test.py (98%) diff --git a/packages/control/algorithm/additional_current_test.py b/packages/control/algorithm/additional_current_test.py index 1336f19718..89c552aeff 100644 --- a/packages/control/algorithm/additional_current_test.py +++ b/packages/control/algorithm/additional_current_test.py @@ -5,7 +5,8 @@ from control.chargepoint.chargepoint import Chargepoint, ChargepointData from control.chargepoint.chargepoint_data import Set from control.chargepoint.control_parameter import ControlParameter -from control.ev import ChargeTemplate, Ev +from control.ev.charge_template import ChargeTemplate +from control.ev.ev import Ev from control.loadmanagement import LimitingValue diff --git a/packages/control/algorithm/common_test.py b/packages/control/algorithm/common_test.py index 9dbf476953..5435c4ef15 100644 --- a/packages/control/algorithm/common_test.py +++ b/packages/control/algorithm/common_test.py @@ -6,7 +6,7 @@ from control import data from control.algorithm import common from control.chargepoint.chargepoint import Chargepoint -from control.ev import Ev +from control.ev.ev import Ev from control.counter import Counter from control.counter_all import CounterAll diff --git a/packages/control/algorithm/filter_chargepoints_test.py b/packages/control/algorithm/filter_chargepoints_test.py index 003df60ee4..78a27af9f8 100644 --- a/packages/control/algorithm/filter_chargepoints_test.py +++ b/packages/control/algorithm/filter_chargepoints_test.py @@ -9,8 +9,9 @@ from control.chargemode import Chargemode from control.chargepoint.chargepoint import Chargepoint, ChargepointData from control.chargepoint.chargepoint_data import Log, Set +from control.chargepoint.control_parameter import ControlParameter from control.counter_all import CounterAll -from control.ev import ControlParameter, Ev, EvData, Get +from control.ev.ev import Ev, EvData, Get @dataclass diff --git a/packages/control/algorithm/integration_test/conftest.py b/packages/control/algorithm/integration_test/conftest.py index 9126cbcf59..abf0f02e18 100644 --- a/packages/control/algorithm/integration_test/conftest.py +++ b/packages/control/algorithm/integration_test/conftest.py @@ -9,7 +9,7 @@ from control.chargepoint.chargepoint import Chargepoint from control.counter_all import CounterAll from control.counter import Counter -from control.ev import Ev +from control.ev.ev import Ev from control.pv import Pv from control.chargepoint.chargepoint_state import ChargepointState from test_utils.default_hierarchies import NESTED_HIERARCHY diff --git a/packages/control/algorithm/surplus_controlled_test.py b/packages/control/algorithm/surplus_controlled_test.py index feea652514..1a8c4a11ec 100644 --- a/packages/control/algorithm/surplus_controlled_test.py +++ b/packages/control/algorithm/surplus_controlled_test.py @@ -11,7 +11,8 @@ from control.chargepoint.chargepoint_data import Get, Set from control.chargepoint.chargepoint_template import CpTemplate from control.chargepoint.control_parameter import ControlParameter -from control.ev import ChargeTemplate, Ev +from control.ev.charge_template import ChargeTemplate +from control.ev.ev import Ev @pytest.fixture(autouse=True) diff --git a/packages/control/auto_phase_switch_test.py b/packages/control/auto_phase_switch_test.py index fc66b7d07f..4abbcaf84f 100644 --- a/packages/control/auto_phase_switch_test.py +++ b/packages/control/auto_phase_switch_test.py @@ -8,7 +8,7 @@ from control.pv_all import PvAll from control.bat_all import BatAll from control.general import General -from control.ev import Ev +from control.ev.ev import Ev from control import data from control.chargepoint.chargepoint_state import ChargepointState diff --git a/packages/control/chargepoint/chargepoint.py b/packages/control/chargepoint/chargepoint.py index ba12a10b2e..d2b9cf8cb7 100644 --- a/packages/control/chargepoint/chargepoint.py +++ b/packages/control/chargepoint/chargepoint.py @@ -29,7 +29,7 @@ from control.chargepoint.control_parameter import ControlParameter, control_parameter_factory from control.chargepoint.charging_type import ChargingType from control.chargepoint.rfid import ChargepointRfidMixin -from control.ev import Ev +from control.ev.ev import Ev from control import phase_switch from control.chargepoint.chargepoint_state import ChargepointState from helpermodules.phase_mapping import convert_single_evu_phase_to_cp_phase diff --git a/packages/control/chargepoint/chargepoint_data.py b/packages/control/chargepoint/chargepoint_data.py index a42511116a..cd635d204f 100644 --- a/packages/control/chargepoint/chargepoint_data.py +++ b/packages/control/chargepoint/chargepoint_data.py @@ -4,7 +4,7 @@ from control.chargepoint.chargepoint_template import CpTemplate from control.chargepoint.control_parameter import ControlParameter, control_parameter_factory -from control.ev import Ev +from control.ev.ev import Ev from dataclass_utils.factories import currents_list_factory, empty_dict_factory, voltages_list_factory from helpermodules.constants import NO_ERROR from modules.common.abstract_chargepoint import AbstractChargepoint diff --git a/packages/control/chargepoint/chargepoint_template.py b/packages/control/chargepoint/chargepoint_template.py index 3d70c6ea10..89d7accdb6 100644 --- a/packages/control/chargepoint/chargepoint_template.py +++ b/packages/control/chargepoint/chargepoint_template.py @@ -4,7 +4,7 @@ from typing import Dict, List from control import data -from control import ev as ev_module +from control.ev import ev as ev_module from control.chargepoint.charging_type import ChargingType from dataclass_utils.factories import empty_dict_factory, empty_list_factory from helpermodules.abstract_plans import AutolockPlan diff --git a/packages/control/chargepoint/chargepoint_test.py b/packages/control/chargepoint/chargepoint_test.py index e5e71a0ebe..a976fb547c 100644 --- a/packages/control/chargepoint/chargepoint_test.py +++ b/packages/control/chargepoint/chargepoint_test.py @@ -4,7 +4,7 @@ from control.chargepoint.chargepoint import Chargepoint from control.chargepoint.chargepoint_template import CpTemplate -from control.ev import Ev +from control.ev.ev import Ev @pytest.mark.parametrize("phase_1, phases, expected_required_currents", diff --git a/packages/control/chargepoint/get_phases_test.py b/packages/control/chargepoint/get_phases_test.py index 4f8f5ce47d..55e8258626 100644 --- a/packages/control/chargepoint/get_phases_test.py +++ b/packages/control/chargepoint/get_phases_test.py @@ -5,7 +5,7 @@ from control.chargepoint.chargepoint import Chargepoint from control.chargepoint.chargepoint_template import CpTemplate, get_chargepoint_template_default -from control.ev import Ev +from control.ev.ev import Ev from control.general import General from control import data diff --git a/packages/control/counter.py b/packages/control/counter.py index 83b2f30fe9..27a770e068 100644 --- a/packages/control/counter.py +++ b/packages/control/counter.py @@ -8,7 +8,7 @@ from control import data from control.chargemode import Chargemode -from control.ev import Ev +from control.ev.ev import Ev from control.chargepoint.chargepoint import Chargepoint from control.chargepoint.chargepoint_state import ChargepointState from dataclass_utils.factories import currents_list_factory, voltages_list_factory diff --git a/packages/control/counter_test.py b/packages/control/counter_test.py index b2326c93d9..4b10d1335f 100644 --- a/packages/control/counter_test.py +++ b/packages/control/counter_test.py @@ -7,7 +7,8 @@ from control import data from control.chargepoint.chargepoint import Chargepoint from control.counter import Counter, CounterData, Get -from control.ev import ChargeTemplate, Ev +from control.ev.ev import Ev +from control.ev.charge_template import ChargeTemplate from control.general import General from control.chargepoint.chargepoint_state import ChargepointState from modules.common.fault_state import FaultStateLevel diff --git a/packages/control/data.py b/packages/control/data.py index 60228e0a36..19ab90929b 100644 --- a/packages/control/data.py +++ b/packages/control/data.py @@ -20,7 +20,9 @@ from helpermodules.subdata import SubData from control.counter import Counter from control.counter_all import CounterAll -from control.ev import ChargeTemplate, Ev, EvTemplate +from control.ev.charge_template import ChargeTemplate +from control.ev.ev import Ev +from control.ev.ev_template import EvTemplate from control.general import General from control.optional import Optional from modules.common.abstract_device import AbstractDevice diff --git a/packages/control/ev.py b/packages/control/ev.py deleted file mode 100644 index 811a58e748..0000000000 --- a/packages/control/ev.py +++ /dev/null @@ -1,985 +0,0 @@ -""" EV-Logik -ermittelt, den Ladestrom, den das EV gerne zur Verfügung hätte. - -In den control Parametern wird sich der Lademodus, Submodus, Priorität, Phasen und Stromstärke gemerkt, -mit denen das EV aktuell in der Regelung berücksichtigt wird. Bei der Ermittlung der benötigten Strom- -stärke wird auch geprüft, ob sich an diesen Parametern etwas geändert hat. Falls ja, muss das EV -in der Regelung neu priorisiert werden und eine neue Zuteilung des Stroms erhalten. -""" -from dataclasses import asdict, dataclass, field -import logging -import traceback -from typing import List, Dict, Optional, Tuple - -from control import data -from control.chargepoint.chargepoint_state import ChargepointState, PHASE_SWITCH_STATES -from control.chargepoint.charging_type import ChargingType -from control.chargepoint.control_parameter import ControlParameter -from control.limiting_value import LimitingValue -from dataclass_utils.factories import empty_dict_factory, empty_list_factory -from helpermodules.abstract_plans import Limit, limit_factory, ScheduledChargingPlan, TimeChargingPlan -from helpermodules import timecheck -from helpermodules.constants import NO_ERROR -from modules.common.abstract_vehicle import VehicleUpdateData -from modules.common.configurable_vehicle import ConfigurableVehicle - -log = logging.getLogger(__name__) - - -def get_vehicle_default() -> dict: - return { - "charge_template": 0, - "ev_template": 0, - "name": "Fahrzeug", - "info": { - "manufacturer": None, - "model": None, - }, - "tag_id": [], - "get/soc": 0 - } - - -def get_new_charge_template() -> dict: - ct_default = asdict(ChargeTemplateData()) - ct_default["chargemode"]["scheduled_charging"].pop("plans") - ct_default["time_charging"].pop("plans") - return ct_default - - -def get_charge_template_default() -> dict: - ct_default = asdict(ChargeTemplateData(name="Standard-Lade-Profil")) - ct_default["chargemode"]["scheduled_charging"].pop("plans") - ct_default["time_charging"].pop("plans") - return ct_default - -# Avoid anti-pattern: mutable default arguments - - -@dataclass -class ScheduledCharging: - plans: Dict[int, ScheduledChargingPlan] = field(default_factory=empty_dict_factory, metadata={ - "topic": ""}) - - -@dataclass -class TimeCharging: - active: bool = False - plans: Dict[int, TimeChargingPlan] = field(default_factory=empty_dict_factory, metadata={ - "topic": ""}) - - -@dataclass -class InstantCharging: - current: int = 10 - dc_current: float = 145 - limit: Limit = field(default_factory=limit_factory) - - -@dataclass -class PvCharging: - dc_min_current: float = 145 - dc_min_soc_current: float = 145 - min_soc_current: int = 10 - min_current: int = 0 - feed_in_limit: bool = False - min_soc: int = 0 - max_soc: int = 100 - - -def pv_charging_factory() -> PvCharging: - return PvCharging() - - -def scheduled_charging_factory() -> ScheduledCharging: - return ScheduledCharging() - - -def instant_charging_factory() -> InstantCharging: - return InstantCharging() - - -@dataclass -class Chargemode: - selected: str = "stop" - pv_charging: PvCharging = field(default_factory=pv_charging_factory) - scheduled_charging: ScheduledCharging = field(default_factory=scheduled_charging_factory) - instant_charging: InstantCharging = field(default_factory=instant_charging_factory) - - -def time_charging_factory() -> TimeCharging: - return TimeCharging() - - -def chargemode_factory() -> Chargemode: - return Chargemode() - - -@dataclass -class Et: - active: bool = False - max_price: float = 0.0002 - - -def et_factory() -> Et: - return Et() - - -@dataclass -class ChargeTemplateData: - name: str = "Lade-Profil" - prio: bool = False - load_default: bool = False - et: Et = field(default_factory=et_factory) - time_charging: TimeCharging = field(default_factory=time_charging_factory) - chargemode: Chargemode = field(default_factory=chargemode_factory) - - -def charge_template_data_factory() -> ChargeTemplateData: - return ChargeTemplateData() - - -@dataclass -class EvTemplateData: - dc_min_current: int = 0 - dc_max_current: int = 0 - name: str = "Fahrzeug-Profil" - max_current_multi_phases: int = 16 - max_phases: int = 3 - phase_switch_pause: int = 2 - prevent_phase_switch: bool = False - prevent_charge_stop: bool = False - control_pilot_interruption: bool = False - control_pilot_interruption_duration: int = 4 - average_consump: float = 17000 - min_current: int = 6 - max_current_single_phase: int = 16 - battery_capacity: float = 82000 - efficiency: float = 90 - nominal_difference: float = 1 - keep_charge_active_duration: int = 40 - - -def ev_template_data_factory() -> EvTemplateData: - return EvTemplateData() - - -@dataclass -class EvTemplate: - """ Klasse mit den EV-Daten - """ - - data: EvTemplateData = field(default_factory=ev_template_data_factory, metadata={ - "topic": "config"}) - et_num: int = 0 - - -def ev_template_factory() -> EvTemplate: - return EvTemplate() - - -@dataclass -class Set: - soc_error_counter: int = field( - default=0, metadata={"topic": "set/soc_error_counter"}) - - -def set_factory() -> Set: - return Set() - - -@dataclass -class Get: - soc: Optional[int] = field(default=None, metadata={"topic": "get/soc"}) - soc_timestamp: Optional[float] = field( - default=None, metadata={"topic": "get/soc_timestamp"}) - soc_request_timestamp: Optional[float] = field( - default=None, metadata={"topic": "get/soc_request_timestamp"}) - force_soc_update: bool = field(default=False, metadata={ - "topic": "get/force_soc_update"}) - range: Optional[float] = field(default=None, metadata={"topic": "get/range"}) - fault_state: int = field(default=0, metadata={"topic": "get/fault_state"}) - fault_str: str = field(default=NO_ERROR, metadata={"topic": "get/fault_str"}) - - -def get_factory() -> Get: - return Get() - - -@dataclass -class EvData: - set: Set = field(default_factory=set_factory) - charge_template: int = field(default=0, metadata={"topic": "charge_template"}) - ev_template: int = field(default=0, metadata={"topic": "ev_template"}) - name: str = field(default="neues Fahrzeug", metadata={"topic": "name"}) - tag_id: List[str] = field(default_factory=empty_list_factory, metadata={ - "topic": "tag_id"}) - get: Get = field(default_factory=get_factory) - - -class Ev: - """Logik des EV - """ - - def __init__(self, index: int): - try: - self.ev_template: EvTemplate = EvTemplate() - self.charge_template: ChargeTemplate = ChargeTemplate(0) - self.soc_module: ConfigurableVehicle = None - self.chargemode_changed = False - self.submode_changed = False - self.num = index - self.data = EvData() - except Exception: - log.exception("Fehler im ev-Modul "+str(self.num)) - - def soc_interval_expired(self, vehicle_update_data: VehicleUpdateData) -> bool: - request_soc = False - if self.data.get.soc_request_timestamp is None: - # Initiale Abfrage - request_soc = True - else: - if vehicle_update_data.plug_state is True or self.soc_module.general_config.request_only_plugged is False: - if (vehicle_update_data.charge_state is True or - (self.data.set.soc_error_counter < 3 and self.data.get.fault_state == 2)): - interval = self.soc_module.general_config.request_interval_charging - else: - interval = self.soc_module.general_config.request_interval_not_charging - # Zeitstempel prüfen, ob wieder abgefragt werden muss. - if timecheck.check_timestamp(self.data.get.soc_request_timestamp, interval-5) is False: - # Zeit ist abgelaufen - request_soc = True - return request_soc - - def get_required_current(self, - control_parameter: ControlParameter, - imported: float, - max_phases_hw: int, - phase_switch_supported: bool, - charging_type: str) -> Tuple[bool, Optional[str], str, float, int]: - """ ermittelt, ob und mit welchem Strom das EV geladen werden soll (unabhängig vom Lastmanagement) - - Parameter - --------- - imported_since_mode_switch: float - seit dem letzten Lademodi-Wechsel geladene Energie. - Return - ------ - state: bool - Soll geladen werden? - message: str - Nachricht, warum nicht geladen werden soll - submode: str - Lademodus, in dem tatsächlich geladen wird - required_current: int - Strom, der nach Ladekonfiguration benötigt wird - """ - phases = None - required_current = None - submode = None - message = None - state = True - try: - if self.charge_template.data.chargemode.selected == "scheduled_charging": - if control_parameter.imported_at_plan_start is None: - control_parameter.imported_at_plan_start = imported - used_amount = imported - control_parameter.imported_at_plan_start - plan_data = self.charge_template.scheduled_charging_recent_plan( - self.data.get.soc, - self.ev_template, - control_parameter.phases, - used_amount, - max_phases_hw, - phase_switch_supported, - charging_type) - soc_request_interval_offset = 0 - if plan_data: - name = self.charge_template.data.chargemode.scheduled_charging.plans[plan_data.num].name - # Wenn mit einem neuen Plan geladen wird, muss auch die Energiemenge von neuem gezählt werden. - if (self.charge_template.data.chargemode.scheduled_charging.plans[plan_data.num].limit. - selected == "amount" and - name != control_parameter.current_plan): - control_parameter.imported_at_plan_start = imported - # Wenn der SoC ein paar Minuten alt ist, kann der Termin trotzdem gehalten werden. - # Zielladen kann nicht genauer arbeiten, als das Abfrageintervall vom SoC. - if (self.soc_module and - self.charge_template.data.chargemode. - scheduled_charging.plans[plan_data.num].limit.selected == "soc"): - soc_request_interval_offset = self.soc_module.general_config.request_interval_charging - control_parameter.current_plan = name - else: - control_parameter.current_plan = None - required_current, submode, message, phases = self.charge_template.scheduled_charging_calc_current( - plan_data, - self.data.get.soc, - used_amount, - control_parameter.phases, - control_parameter.min_current, - soc_request_interval_offset) - - # Wenn Zielladen auf Überschuss wartet, prüfen, ob Zeitladen aktiv ist. - if (submode != "instant_charging" and - self.charge_template.data.time_charging.active): - if control_parameter.imported_at_plan_start is None: - control_parameter.imported_at_plan_start = imported - used_amount = imported - control_parameter.imported_at_plan_start - tmp_current, tmp_submode, tmp_message, name = self.charge_template.time_charging( - self.data.get.soc, - used_amount, - charging_type - ) - # Info vom Zielladen erhalten - message = f"{message or ''} {tmp_message or ''}".strip() - if tmp_current > 0: - control_parameter.current_plan = name - # Wenn mit einem neuen Plan geladen wird, muss auch die Energiemenge von neuem gezählt werden. - if name != control_parameter.current_plan: - control_parameter.imported_at_plan_start = imported - required_current = tmp_current - submode = tmp_submode - if (required_current == 0) or (required_current is None): - if self.charge_template.data.chargemode.selected == "instant_charging": - # Wenn der Submode auf stop gestellt wird, wird auch die Energiemenge seit Wechsel des Modus - # zurückgesetzt, dann darf nicht die Energiemenge erneute geladen werden. - if control_parameter.imported_instant_charging is None: - control_parameter.imported_instant_charging = imported - used_amount = imported - control_parameter.imported_instant_charging - required_current, submode, message = self.charge_template.instant_charging( - self.data.get.soc, - used_amount, - charging_type) - elif self.charge_template.data.chargemode.selected == "pv_charging": - required_current, submode, message = self.charge_template.pv_charging( - self.data.get.soc, control_parameter.min_current, charging_type) - elif self.charge_template.data.chargemode.selected == "standby": - # Text von Zeit-und Zielladen nicht überschreiben. - if message is None: - required_current, submode, message = self.charge_template.standby() - else: - required_current, submode, _ = self.charge_template.standby() - elif self.charge_template.data.chargemode.selected == "stop": - required_current, submode, message = self.charge_template.stop() - if submode == "stop" or submode == "standby" or (self.charge_template.data.chargemode.selected == "stop"): - state = False - if phases is None: - phases = control_parameter.phases - return state, message, submode, required_current, phases - except Exception as e: - log.exception("Fehler im ev-Modul "+str(self.num)) - return (False, f"Kein Ladevorgang, da ein Fehler aufgetreten ist: {' '.join(e.args)}", "stop", 0, - control_parameter.phases) - - def set_chargemode_changed(self, control_parameter: ControlParameter, submode: str) -> None: - if ((submode == "time_charging" and control_parameter.chargemode != "time_charging") or - (submode != "time_charging" and - control_parameter.chargemode != self.charge_template.data.chargemode.selected)): - self.chargemode_changed = True - log.debug("Änderung des Lademodus") - else: - self.chargemode_changed = False - - def set_submode_changed(self, control_parameter: ControlParameter, submode: str) -> None: - self.submode_changed = (submode != control_parameter.submode) - - def check_min_max_current(self, - control_parameter: ControlParameter, - required_current: float, - phases: int, - charging_type: str, - pv: bool = False,) -> Tuple[float, Optional[str]]: - """ prüft, ob der gesetzte Ladestrom über dem Mindest-Ladestrom und unter dem Maximal-Ladestrom des EVs liegt. - Falls nicht, wird der Ladestrom auf den Mindest-Ladestrom bzw. den Maximal-Ladestrom des EV gesetzt. - Wenn PV-Laden aktiv ist, darf die Stromstärke nicht unter den PV-Mindeststrom gesetzt werden. - """ - msg = None - # Überprüfung bei 0 (automatische Umschaltung) erfolgt nach der Prüfung der Phasenumschaltung, wenn fest - # steht, mit vielen Phasen geladen werden soll. - if phases != 0: - # EV soll/darf nicht laden - if required_current != 0: - if not pv: - if charging_type == ChargingType.AC.value: - min_current = self.ev_template.data.min_current - else: - min_current = self.ev_template.data.dc_min_current - else: - min_current = control_parameter.required_current - if required_current < min_current: - required_current = min_current - msg = ("Die Einstellungen in dem Fahrzeug-Profil beschränken den Strom auf " - f"mindestens {required_current} A.") - else: - if charging_type == ChargingType.AC.value: - if phases == 1: - max_current = self.ev_template.data.max_current_single_phase - else: - max_current = self.ev_template.data.max_current_multi_phases - else: - max_current = self.ev_template.data.dc_max_current - if required_current > max_current: - required_current = max_current - msg = ("Die Einstellungen in dem Fahrzeug-Profil beschränken den Strom auf " - f"maximal {required_current} A.") - return required_current, msg - - CURRENT_OUT_OF_NOMINAL_DIFFERENCE = (", da das Fahrzeug nicht mit der vorgegebenen Stromstärke +/- der erlaubten " - + "Stromabweichung aus dem Fahrzeug-Profil/Minimalen Dauerstrom lädt.") - ENOUGH_POWER = ", da ausreichend Überschuss für mehrphasiges Laden zur Verfügung steht." - NOT_ENOUGH_POWER = ", da nicht ausreichend Überschuss für mehrphasiges Laden zur Verfügung steht." - - def _check_phase_switch_conditions(self, - control_parameter: ControlParameter, - get_currents: List[float], - get_power: float, - max_current_cp: int, - limit: LimitingValue) -> Tuple[bool, Optional[str]]: - # Manche EV laden mit 6.1A bei 6A Soll-Strom - min_current = (max(control_parameter.min_current, control_parameter.required_current) + - self.ev_template.data.nominal_difference) - max_current = (min(self.ev_template.data.max_current_single_phase, max_current_cp) - - self.ev_template.data.nominal_difference) - phases_in_use = control_parameter.phases - pv_config = data.data.general_data.data.chargemode_config.pv_charging - max_phases_ev = self.ev_template.data.max_phases - if self.charge_template.data.chargemode.pv_charging.feed_in_limit: - feed_in_yield = pv_config.feed_in_yield - else: - feed_in_yield = 0 - all_surplus = data.data.counter_all_data.get_evu_counter().get_usable_surplus(feed_in_yield) - required_surplus = control_parameter.min_current * max_phases_ev * 230 - get_power - condition_1_to_3 = (((max(get_currents) > max_current and - all_surplus > required_surplus) or limit == LimitingValue.UNBALANCED_LOAD.value) and - phases_in_use == 1) - condition_3_to_1 = max(get_currents) < min_current and all_surplus <= 0 and phases_in_use > 1 - if condition_1_to_3 or condition_3_to_1: - return True, None - else: - if phases_in_use > 1 and all_surplus > 0: - return False, self.ENOUGH_POWER - elif phases_in_use == 1 and all_surplus < required_surplus: - return False, self.NOT_ENOUGH_POWER - else: - return False, self.CURRENT_OUT_OF_NOMINAL_DIFFERENCE - - PHASE_SWITCH_DELAY_TEXT = '{} Phasen in {}.' - - def auto_phase_switch(self, - control_parameter: ControlParameter, - cp_num: int, - get_currents: List[float], - get_power: float, - max_current_cp: int, - max_phases: int, - limit: LimitingValue) -> Tuple[int, float, Optional[str]]: - message = None - current = control_parameter.required_current - timestamp_auto_phase_switch = control_parameter.timestamp_auto_phase_switch - phases_to_use = control_parameter.phases - phases_in_use = control_parameter.phases - pv_config = data.data.general_data.data.chargemode_config.pv_charging - cm_config = data.data.general_data.data.chargemode_config - if self.charge_template.data.chargemode.pv_charging.feed_in_limit: - feed_in_yield = pv_config.feed_in_yield - else: - feed_in_yield = 0 - all_surplus = data.data.counter_all_data.get_evu_counter().get_usable_surplus(feed_in_yield) - if phases_in_use == 1: - direction_str = f"Umschaltung von 1 auf {max_phases}" - delay = cm_config.phase_switch_delay * 60 - required_reserved_power = (control_parameter.min_current * max_phases * 230 - - self.ev_template.data.max_current_single_phase * 230) - - new_phase = max_phases - new_current = control_parameter.min_current - else: - direction_str = f"Umschaltung von {max_phases} auf 1" - delay = (16 - cm_config.phase_switch_delay) * 60 - # Es kann einphasig mit entsprechend niedriger Leistung gestartet werden. - required_reserved_power = 0 - new_phase = 1 - new_current = self.ev_template.data.max_current_single_phase - - log.debug( - f'Genutzter Strom: {max(get_currents)}A, Überschuss: {all_surplus}W, benötigte neue Leistung: ' - f'{required_reserved_power}W') - # Wenn gerade umgeschaltet wird, darf kein Timer gestartet werden. - if not self.ev_template.data.prevent_phase_switch: - condition, condition_msg = self._check_phase_switch_conditions(control_parameter, - get_currents, - get_power, - max_current_cp, - limit) - if control_parameter.state not in PHASE_SWITCH_STATES: - if condition: - # Umschaltverzögerung starten - timestamp_auto_phase_switch = timecheck.create_timestamp() - # Wenn nach der Umschaltung weniger Leistung benötigt wird, soll während der Verzögerung keine - # neuen eingeschaltet werden. - data.data.counter_all_data.get_evu_counter( - ).data.set.reserved_surplus += max(0, required_reserved_power) - message = self.PHASE_SWITCH_DELAY_TEXT.format( - direction_str, - timecheck.convert_timestamp_delta_to_time_string(timestamp_auto_phase_switch, delay)) - control_parameter.state = ChargepointState.PHASE_SWITCH_DELAY - elif condition_msg: - log.debug(f"Keine Phasenumschaltung{condition_msg}") - else: - if condition: - # Timer laufen lassen - if timecheck.check_timestamp(timestamp_auto_phase_switch, delay): - message = self.PHASE_SWITCH_DELAY_TEXT.format( - direction_str, - timecheck.convert_timestamp_delta_to_time_string(timestamp_auto_phase_switch, delay)) - else: - timestamp_auto_phase_switch = None - data.data.counter_all_data.get_evu_counter( - ).data.set.reserved_surplus -= max(0, required_reserved_power) - phases_to_use = new_phase - current = new_current - log.debug("Phasenumschaltung kann nun durchgeführt werden.") - control_parameter.state = ChargepointState.PHASE_SWITCH_AWAITED - else: - timestamp_auto_phase_switch = None - data.data.counter_all_data.get_evu_counter( - ).data.set.reserved_surplus -= max(0, required_reserved_power) - message = f"Verzögerung für die {direction_str} Phasen abgebrochen{condition_msg}" - control_parameter.state = ChargepointState.CHARGING_ALLOWED - - if message: - log.info(f"LP {cp_num}: {message}") - if timestamp_auto_phase_switch != control_parameter.timestamp_auto_phase_switch: - control_parameter.timestamp_auto_phase_switch = timestamp_auto_phase_switch - return phases_to_use, current, message - - def reset_phase_switch(self, control_parameter: ControlParameter): - """ Zurücksetzen der Zeitstempel und reservierten Leistung. - - Die Phasenumschaltung kann nicht abgebrochen werden! - """ - if control_parameter.timestamp_auto_phase_switch is not None: - control_parameter.timestamp_auto_phase_switch = None - # Wenn der Timer läuft, ist den Control-Parametern die alte Phasenzahl hinterlegt. - if control_parameter.phases == 1: - reserved = control_parameter.required_current * \ - 3 * 230 - self.ev_template.data.max_current_single_phase * 230 - data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus -= reserved - log.debug( - "Zurücksetzen der reservierten Leistung für die Phasenumschaltung. reservierte Leistung: " + - str(data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus)) - else: - reserved = self.ev_template.data.max_current_single_phase * \ - 230 - control_parameter.required_current * 3 * 230 - data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus -= reserved - log.debug( - "Zurücksetzen der reservierten Leistung für die Phasenumschaltung. reservierte Leistung: " + - str(data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus)) - control_parameter.state = ChargepointState.CHARGING_ALLOWED - - def load_default_profile(self): - """ prüft, ob nach dem Abstecken das Standardprofil geladen werden soll und lädt dieses ggf.. - """ - pass - - def lock_cp(self): - """prüft, ob nach dem Abstecken der LP gesperrt werden soll und sperrt diesen ggf.. - """ - pass - - -@dataclass -class SelectedPlan: - remaining_time: float = 0 - available_current: float = 14 - duration: float = 0 - max_current: int = 16 - missing_amount: float = 0 - phases: int = 1 - num: int = 0 - - -@dataclass -class ChargeTemplate: - """ Klasse der Lade-Profile - """ - ct_num: int - data: ChargeTemplateData = field(default_factory=charge_template_data_factory, metadata={ - "topic": ""}) - - BUFFER = -1200 # nach mehr als 20 Min Überschreitung wird der Termin als verpasst angesehen - CHARGING_PRICE_EXCEEDED = "Keine Ladung, da der aktuelle Strompreis über dem maximalen Strompreis liegt." - - TIME_CHARGING_NO_PLAN_CONFIGURED = "Keine Ladung, da keine Zeitfenster für Zeitladen konfiguriert sind." - TIME_CHARGING_NO_PLAN_ACTIVE = "Keine Ladung, da kein Zeitfenster für Zeitladen aktiv ist." - TIME_CHARGING_SOC_REACHED = "Kein Zeitladen, da der Soc bereits erreicht wurde." - TIME_CHARGING_AMOUNT_REACHED = "Kein Zeitladen, da die Energiemenge bereits geladen wurde." - - def time_charging(self, - soc: Optional[float], - used_amount_time_charging: float, - charging_type: str) -> Tuple[int, str, Optional[str], Optional[str]]: - """ prüft, ob ein Zeitfenster aktiv ist und setzt entsprechend den Ladestrom - """ - message = None - try: - if self.data.time_charging.plans: - plan = timecheck.check_plans_timeframe(self.data.time_charging.plans) - if plan is not None: - current = plan.current if charging_type == ChargingType.AC.value else plan.dc_current - if self.data.et.active and data.data.optional_data.et_provider_available(): - if not data.data.optional_data.et_price_lower_than_limit(self.data.et.max_price): - return 0, "stop", self.CHARGING_PRICE_EXCEEDED, plan.name - if plan.limit.selected == "none": # kein Limit konfiguriert, mit konfigurierter Stromstärke laden - return current, "time_charging", message, plan.name - elif plan.limit.selected == "soc": # SoC Limit konfiguriert - if soc: - if soc < plan.limit.soc: - return current, "time_charging", message, plan.name # Limit nicht erreicht - else: - return 0, "stop", self.TIME_CHARGING_SOC_REACHED, plan.name # Limit erreicht - else: - return plan.current, "time_charging", message, plan.name - elif plan.limit.selected == "amount": # Energiemengenlimit konfiguriert - if used_amount_time_charging < plan.limit.amount: - return current, "time_charging", message, plan.name # Limit nicht erreicht - else: - return 0, "stop", self.TIME_CHARGING_AMOUNT_REACHED, plan.name # Limit erreicht - else: - raise TypeError(f'{plan.limit.selected} unbekanntes Zeitladen-Limit.') - else: - message = self.TIME_CHARGING_NO_PLAN_ACTIVE - else: - message = self.TIME_CHARGING_NO_PLAN_CONFIGURED - log.debug(message) - return 0, "stop", message, None - except Exception: - log.exception("Fehler im ev-Modul "+str(self.ct_num)) - return 0, "stop", "Keine Ladung, da da ein interner Fehler aufgetreten ist: "+traceback.format_exc(), None - - INSTANT_CHARGING_SOC_REACHED = "Kein Sofortladen, da der Soc bereits erreicht wurde." - INSTANT_CHARGING_AMOUNT_REACHED = "Kein Sofortladen, da die Energiemenge bereits geladen wurde." - - def instant_charging(self, - soc: Optional[float], - imported_instant_charging: float, - charging_type: str) -> Tuple[int, str, Optional[str]]: - """ prüft, ob die Lademengenbegrenzung erreicht wurde und setzt entsprechend den Ladestrom. - """ - message = None - try: - instant_charging = self.data.chargemode.instant_charging - if charging_type == ChargingType.AC.value: - current = instant_charging.current - else: - current = instant_charging.dc_current - if self.data.et.active and data.data.optional_data.et_provider_available(): - if not data.data.optional_data.et_price_lower_than_limit(self.data.et.max_price): - return 0, "stop", self.CHARGING_PRICE_EXCEEDED - if instant_charging.limit.selected == "none": - return current, "instant_charging", message - elif instant_charging.limit.selected == "soc": - if soc: - if soc < instant_charging.limit.soc: - return current, "instant_charging", message - else: - return 0, "stop", self.INSTANT_CHARGING_SOC_REACHED - else: - return current, "instant_charging", message - elif instant_charging.limit.selected == "amount": - if imported_instant_charging < self.data.chargemode.instant_charging.limit.amount: - return current, "instant_charging", message - else: - return 0, "stop", self.INSTANT_CHARGING_AMOUNT_REACHED - else: - raise TypeError(f'{instant_charging.limit.selected} unbekanntes Sofortladen-Limit.') - except Exception: - log.exception("Fehler im ev-Modul "+str(self.ct_num)) - return 0, "stop", "Keine Ladung, da da ein interner Fehler aufgetreten ist: "+traceback.format_exc() - - PV_CHARGING_SOC_REACHED = "Keine Ladung, da der maximale Soc bereits erreicht wurde." - PV_CHARGING_SOC_CHARGING = ("Ladung evtl. auch ohne PV-Überschuss, da der Mindest-SoC des Fahrzeugs noch nicht " - "erreicht wurde.") - PV_CHARGING_MIN_CURRENT_CHARGING = "Ladung evtl. auch ohne PV-Überschuss, da minimaler Dauerstrom aktiv ist." - - def pv_charging(self, soc: Optional[float], min_current: int, charging_type: str) -> Tuple[int, str, Optional[str]]: - """ prüft, ob Min-oder Max-Soc erreicht wurden und setzt entsprechend den Ladestrom. - """ - message = None - try: - pv_charging = self.data.chargemode.pv_charging - if soc is None or soc < pv_charging.max_soc: - if pv_charging.min_soc != 0 and soc is not None: - if soc < pv_charging.min_soc: - if charging_type == ChargingType.AC.value: - current = pv_charging.min_soc_current - else: - current = pv_charging.dc_min_soc_current - return current, "instant_charging", self.PV_CHARGING_SOC_CHARGING - if charging_type == ChargingType.AC.value: - pv_min_current = pv_charging.min_current - else: - pv_min_current = pv_charging.dc_min_current - if pv_min_current == 0: - # nur PV; Ampere darf nicht 0 sein, wenn geladen werden soll - return min_current, "pv_charging", message - else: - # Min PV - return pv_min_current, "instant_charging", self.PV_CHARGING_MIN_CURRENT_CHARGING - else: - return 0, "stop", self.PV_CHARGING_SOC_REACHED - except Exception: - log.exception("Fehler im ev-Modul "+str(self.ct_num)) - return 0, "stop", "Keine Ladung, da ein interner Fehler aufgetreten ist: "+traceback.format_exc() - - def scheduled_charging_recent_plan(self, - soc: float, - ev_template: EvTemplate, - phases: int, - used_amount: float, - max_phases: int, - phase_switch_supported: bool, - charging_type: str) -> Tuple[Optional[SelectedPlan], float]: - """ prüft, ob der Ziel-SoC oder die Ziel-Energiemenge erreicht wurde und stellt den zur Erreichung nötigen - Ladestrom ein. Um etwas mehr Puffer zu haben, wird bis 20 Min nach dem Zieltermin noch geladen, wenn dieser - nicht eingehalten werden konnte. - """ - if phase_switch_supported: - if charging_type == ChargingType.AC.value: - max_current = ev_template.data.max_current_multi_phases - else: - max_current = ev_template.data.dc_max_current - instant_phases = data.data.general_data.get_phases_chargemode("scheduled_charging", "instant_charging") - if instant_phases == 0: - planned_phases = 3 - else: - planned_phases = instant_phases - planned_phases = min(planned_phases, max_phases) - plan_data = self.search_plan(max_current, soc, ev_template, planned_phases, used_amount, charging_type) - if (plan_data and - charging_type == ChargingType.AC.value and - instant_phases == 0 and - plan_data.remaining_time > 300 and - self.data.et.active is False): - max_current = ev_template.data.max_current_single_phase - plan_data_single_phase = self.search_plan( - max_current, soc, ev_template, 1, used_amount, charging_type) - if plan_data_single_phase: - if plan_data_single_phase.remaining_time > 300: - plan_data = plan_data_single_phase - else: - if charging_type == ChargingType.AC.value: - if phases == 1: - max_current = ev_template.data.max_current_single_phase - else: - max_current = ev_template.data.max_current_multi_phases - else: - max_current = ev_template.data.dc_max_current - plan_data = self.search_plan(max_current, soc, ev_template, phases, used_amount, charging_type) - return plan_data - - def search_plan(self, - max_current: int, - soc: Optional[float], - ev_template: EvTemplate, - phases: int, - used_amount: float, - charging_type: str) -> Optional[SelectedPlan]: - smallest_remaining_time = float("inf") - missed_date_today_of_plan_with_smallest_remaining_time = False - plan_data: Optional[SelectedPlan] = None - battery_capacity = ev_template.data.battery_capacity - for num, plan in self.data.chargemode.scheduled_charging.plans.items(): - if plan.active: - if plan.limit.selected == "soc" and soc is None: - raise ValueError("Um Zielladen mit SoC-Ziel nutzen zu können, bitte ein SoC-Modul konfigurieren " - f"oder im Plan {plan.name} als Begrenzung Energie einstellen.") - try: - duration, missing_amount = self.calculate_duration( - plan, soc, battery_capacity, used_amount, phases, charging_type, ev_template) - remaining_time, missed_date_today = timecheck.check_duration(plan, duration, self.BUFFER) - if remaining_time: - # Wenn der Zeitpunkt vorüber, aber noch nicht abgelaufen ist oder - # wenn noch gar kein Plan vorhanden ist, - if ((remaining_time < 0 and missed_date_today is False) or - # oder der Zeitpunkt noch nicht vorüber ist - remaining_time > 0): - # Wenn die verbleibende Zeit geringer als die niedrigste bisherige verbleibende Zeit ist - if (remaining_time < smallest_remaining_time or - # oder wenn der Zeitpunkt abgelaufen ist und es noch einen Zeitpunkt gibt, der in - # der Zukunft liegt. - (missed_date_today_of_plan_with_smallest_remaining_time and 0 < remaining_time)): - smallest_remaining_time = remaining_time - missed_date_today_of_plan_with_smallest_remaining_time = missed_date_today - if charging_type == ChargingType.AC.value: - available_current = plan.current - else: - available_current = plan.dc_current - plan_data = SelectedPlan( - remaining_time=remaining_time, - available_current=available_current, - max_current=max_current, - phases=phases, - num=num, - missing_amount=missing_amount, - duration=duration) - log.debug(f"Plan-Nr. {num}: Differenz zum Start {remaining_time}s, Dauer {duration/3600}h, " - f"Termin heute verpasst: {missed_date_today}") - except Exception: - log.exception("Fehler im ev-Modul "+str(self.ct_num)) - return plan_data - - def calculate_duration(self, - plan: ScheduledChargingPlan, - soc: Optional[float], - battery_capacity: float, - used_amount: float, - phases: int, - charging_type: str, - ev_template: EvTemplate) -> Tuple[float, float]: - if plan.limit.selected == "soc": - if soc is not None: - missing_amount = ((plan.limit.soc_scheduled - soc) / 100) * battery_capacity - else: - raise ValueError("Um Zielladen mit SoC-Ziel nutzen zu können, bitte ein SoC-Modul konfigurieren.") - else: - missing_amount = plan.limit.amount - used_amount - current = plan.current if charging_type == ChargingType.AC.value else plan.dc_current - current = max(current, ev_template.data.min_current if charging_type == - ChargingType.AC.value else ev_template.data.dc_min_current) - duration = missing_amount/(current * phases*230) * 3600 - return duration, missing_amount - - SCHEDULED_REACHED_LIMIT_SOC = ("Kein Zielladen, da noch Zeit bis zum Zieltermin ist. " - "Kein Zielladen mit Überschuss, da das SoC-Limit für Überschuss-Laden " + - "erreicht wurde.") - SCHEDULED_CHARGING_REACHED_LIMIT_SOC = ("Kein Zielladen, da das Limit für Fahrzeug Laden mit Überschuss (SoC-Limit)" - " sowie der Fahrzeug-SoC (Ziel-SoC) bereits erreicht wurde.") - SCHEDULED_CHARGING_REACHED_AMOUNT = "Kein Zielladen, da die Energiemenge bereits erreicht wurde." - SCHEDULED_CHARGING_REACHED_SCHEDULED_SOC = ("Falls vorhanden wird mit EVU-Überschuss geladen, da der Ziel-Soc " - "für Zielladen bereits erreicht wurde.") - SCHEDULED_CHARGING_NO_PLANS_CONFIGURED = "Keine Ladung, da keine Ziel-Termine konfiguriert sind." - SCHEDULED_CHARGING_NO_DATE_PENDING = "Kein Zielladen, da kein Ziel-Termin in den nächsten 24 Stunden ansteht." - SCHEDULED_CHARGING_USE_PV = ("Kein Zielladen, da noch Zeit bis zum Zieltermin ist. Falls vorhanden, " - "wird mit Überschuss geladen.") - SCHEDULED_CHARGING_MAX_CURRENT = ("Zielladen mit {}A. Der Ladestrom wurde erhöht, um den Zieltermin zu erreichen. " - "Es wird bis max. 20 Minuten nach dem angegebenen Zieltermin geladen.") - SCHEDULED_CHARGING_LIMITED_BY_SOC = 'einen SoC von {}%' - SCHEDULED_CHARGING_LIMITED_BY_AMOUNT = '{}kWh geladene Energie' - SCHEDULED_CHARGING_IN_TIME = ('Zielladen mit mindestens {}A, um {} um {} zu erreichen. Falls vorhanden wird ' - 'zusätzlich EVU-Überschuss geladen.') - SCHEDULED_CHARGING_CHEAP_HOUR = "Zielladen, da ein günstiger Zeitpunkt zum preisbasierten Laden ist." - SCHEDULED_CHARGING_EXPENSIVE_HOUR = ("Zielladen ausstehend, da jetzt kein günstiger Zeitpunkt zum preisbasierten " - "Laden ist. Falls vorhanden, wird mit Überschuss geladen.") - - def scheduled_charging_calc_current(self, - plan_data: Optional[SelectedPlan], - soc: int, - used_amount: float, - control_parameter_phases: int, - min_current: int, - soc_request_interval_offset: int) -> Tuple[float, str, str, int]: - current = 0 - submode = "stop" - if plan_data is None: - if len(self.data.chargemode.scheduled_charging.plans) == 0: - return current, submode, self.SCHEDULED_CHARGING_NO_PLANS_CONFIGURED, control_parameter_phases - else: - return current, submode, self.SCHEDULED_CHARGING_NO_DATE_PENDING, control_parameter_phases - current_plan = self.data.chargemode.scheduled_charging.plans[plan_data.num] - limit = current_plan.limit - phases = plan_data.phases - log.debug("Verwendeter Plan: "+str(current_plan.name)) - if limit.selected == "soc" and soc >= limit.soc_limit and soc >= limit.soc_scheduled: - message = self.SCHEDULED_CHARGING_REACHED_LIMIT_SOC - elif limit.selected == "soc" and limit.soc_scheduled <= soc < limit.soc_limit: - message = self.SCHEDULED_CHARGING_REACHED_SCHEDULED_SOC - current = min_current - submode = "pv_charging" - # bei Überschuss-Laden mit der Phasenzahl aus den control_parameter laden, - # um die Umschaltung zu berücksichtigen. - phases = control_parameter_phases - elif limit.selected == "amount" and used_amount >= limit.amount: - message = self.SCHEDULED_CHARGING_REACHED_AMOUNT - elif 0 - soc_request_interval_offset < plan_data.remaining_time < 300 + soc_request_interval_offset: - # 5 Min vor spätestem Ladestart - if limit.selected == "soc": - limit_string = self.SCHEDULED_CHARGING_LIMITED_BY_SOC.format(limit.soc_scheduled) - else: - limit_string = self.SCHEDULED_CHARGING_LIMITED_BY_AMOUNT.format(limit.amount/1000) - message = self.SCHEDULED_CHARGING_IN_TIME.format( - plan_data.available_current, limit_string, current_plan.time) - current = plan_data.available_current - submode = "instant_charging" - # weniger als die berechnete Zeit verfügbar - # Ladestart wurde um maximal 20 Min verpasst. - elif plan_data.remaining_time <= 0 - soc_request_interval_offset: - if plan_data.duration + plan_data.remaining_time < 0: - current = plan_data.max_current - else: - current = min(plan_data.missing_amount/((plan_data.duration + plan_data.remaining_time) / - 3600)/(phases*230), plan_data.max_current) - message = self.SCHEDULED_CHARGING_MAX_CURRENT.format(round(current, 2)) - submode = "instant_charging" - else: - # Wenn dynamische Tarife aktiv sind, prüfen, ob jetzt ein günstiger Zeitpunkt zum Laden ist. - if self.data.et.active and data.data.optional_data.et_provider_available(): - hour_list = data.data.optional_data.et_get_loading_hours(plan_data.duration, plan_data.remaining_time) - log.debug(f"Günstige Ladezeiten: {hour_list}") - if timecheck.is_list_valid(hour_list): - message = self.SCHEDULED_CHARGING_CHEAP_HOUR - current = plan_data.available_current - submode = "instant_charging" - elif ((limit.selected == "soc" and soc <= limit.soc_limit) or - (limit.selected == "amount" and used_amount < limit.amount)): - message = self.SCHEDULED_CHARGING_EXPENSIVE_HOUR - current = min_current - submode = "pv_charging" - phases = control_parameter_phases - else: - message = self.SCHEDULED_REACHED_LIMIT_SOC - else: - # Wenn SoC-Limit erreicht wurde, soll nicht mehr mit Überschuss geladen werden - if limit.selected == "soc" and soc >= limit.soc_limit: - message = self.SCHEDULED_REACHED_LIMIT_SOC - else: - message = self.SCHEDULED_CHARGING_USE_PV - current = min_current - submode = "pv_charging" - phases = control_parameter_phases - return current, submode, message, phases - - def standby(self) -> Tuple[int, str, str]: - return 0, "standby", "Keine Ladung, da der Lademodus Standby aktiv ist." - - def stop(self) -> Tuple[int, str, str]: - return 0, "stop", "Keine Ladung, da der Lademodus Stop aktiv ist." - - -def get_ev_to_rfid(rfid: str, vehicle_id: Optional[str] = None) -> Optional[int]: - """ ermittelt zum übergebenen ID-Tag das Fahrzeug - - Parameter - --------- - rfid: string - ID-Tag - vehicle_id: string - MAC-Adresse des ID-Tags (nur openWB Pro) - - Return - ------ - vehicle: int - Nummer des EV, das zum Tag gehört - """ - for vehicle in data.data.ev_data: - try: - if "ev" in vehicle: - if vehicle_id is not None and vehicle_id in data.data.ev_data[vehicle].data.tag_id: - log.debug(f"MAC {vehicle_id} wird EV {data.data.ev_data[vehicle].num} zugeordnet.") - return data.data.ev_data[vehicle].num - if rfid in data.data.ev_data[vehicle].data.tag_id: - log.debug(f"RFID {rfid} wird EV {data.data.ev_data[vehicle].num} zugeordnet.") - return data.data.ev_data[vehicle].num - except Exception: - log.exception("Fehler im ev-Modul "+vehicle) - return None - else: - return None diff --git a/packages/control/ev/charge_template.py b/packages/control/ev/charge_template.py new file mode 100644 index 0000000000..1e90f4d67b --- /dev/null +++ b/packages/control/ev/charge_template.py @@ -0,0 +1,478 @@ +from dataclasses import asdict, dataclass, field +import logging +import traceback +from typing import Dict, Optional, Tuple + +from control import data +from control.chargepoint.charging_type import ChargingType +from control.ev.ev_template import EvTemplate +from dataclass_utils.factories import empty_dict_factory +from helpermodules.abstract_plans import Limit, limit_factory, ScheduledChargingPlan, TimeChargingPlan +from helpermodules import timecheck +log = logging.getLogger(__name__) + + +def get_new_charge_template() -> dict: + ct_default = asdict(ChargeTemplateData()) + ct_default["chargemode"]["scheduled_charging"].pop("plans") + ct_default["time_charging"].pop("plans") + return ct_default + + +def get_charge_template_default() -> dict: + ct_default = asdict(ChargeTemplateData(name="Standard-Lade-Profil")) + ct_default["chargemode"]["scheduled_charging"].pop("plans") + ct_default["time_charging"].pop("plans") + return ct_default + + +@dataclass +class ScheduledCharging: + plans: Dict[int, ScheduledChargingPlan] = field(default_factory=empty_dict_factory, metadata={ + "topic": ""}) + + +@dataclass +class TimeCharging: + active: bool = False + plans: Dict[int, TimeChargingPlan] = field(default_factory=empty_dict_factory, metadata={ + "topic": ""}) + + +@dataclass +class InstantCharging: + current: int = 10 + dc_current: float = 145 + limit: Limit = field(default_factory=limit_factory) + + +@dataclass +class PvCharging: + dc_min_current: float = 145 + dc_min_soc_current: float = 145 + min_soc_current: int = 10 + min_current: int = 0 + feed_in_limit: bool = False + min_soc: int = 0 + max_soc: int = 100 + + +def pv_charging_factory() -> PvCharging: + return PvCharging() + + +def scheduled_charging_factory() -> ScheduledCharging: + return ScheduledCharging() + + +def instant_charging_factory() -> InstantCharging: + return InstantCharging() + + +@dataclass +class Chargemode: + selected: str = "stop" + pv_charging: PvCharging = field(default_factory=pv_charging_factory) + scheduled_charging: ScheduledCharging = field(default_factory=scheduled_charging_factory) + instant_charging: InstantCharging = field(default_factory=instant_charging_factory) + + +def time_charging_factory() -> TimeCharging: + return TimeCharging() + + +def chargemode_factory() -> Chargemode: + return Chargemode() + + +@dataclass +class Et: + active: bool = False + max_price: float = 0.0002 + + +def et_factory() -> Et: + return Et() + + +@dataclass +class ChargeTemplateData: + name: str = "Lade-Profil" + prio: bool = False + load_default: bool = False + et: Et = field(default_factory=et_factory) + time_charging: TimeCharging = field(default_factory=time_charging_factory) + chargemode: Chargemode = field(default_factory=chargemode_factory) + + +def charge_template_data_factory() -> ChargeTemplateData: + return ChargeTemplateData() + + +@dataclass +class SelectedPlan: + remaining_time: float = 0 + available_current: float = 14 + duration: float = 0 + max_current: int = 16 + missing_amount: float = 0 + phases: int = 1 + num: int = 0 + + +@dataclass +class ChargeTemplate: + """ Klasse der Lade-Profile + """ + ct_num: int + data: ChargeTemplateData = field(default_factory=charge_template_data_factory, metadata={ + "topic": ""}) + + BUFFER = -1200 # nach mehr als 20 Min Überschreitung wird der Termin als verpasst angesehen + CHARGING_PRICE_EXCEEDED = "Keine Ladung, da der aktuelle Strompreis über dem maximalen Strompreis liegt." + + TIME_CHARGING_NO_PLAN_CONFIGURED = "Keine Ladung, da keine Zeitfenster für Zeitladen konfiguriert sind." + TIME_CHARGING_NO_PLAN_ACTIVE = "Keine Ladung, da kein Zeitfenster für Zeitladen aktiv ist." + TIME_CHARGING_SOC_REACHED = "Kein Zeitladen, da der Soc bereits erreicht wurde." + TIME_CHARGING_AMOUNT_REACHED = "Kein Zeitladen, da die Energiemenge bereits geladen wurde." + + def time_charging(self, + soc: Optional[float], + used_amount_time_charging: float, + charging_type: str) -> Tuple[int, str, Optional[str], Optional[str]]: + """ prüft, ob ein Zeitfenster aktiv ist und setzt entsprechend den Ladestrom + """ + message = None + try: + if self.data.time_charging.plans: + plan = timecheck.check_plans_timeframe(self.data.time_charging.plans) + if plan is not None: + current = plan.current if charging_type == ChargingType.AC.value else plan.dc_current + if self.data.et.active and data.data.optional_data.et_provider_available(): + if not data.data.optional_data.et_price_lower_than_limit(self.data.et.max_price): + return 0, "stop", self.CHARGING_PRICE_EXCEEDED, plan.name + if plan.limit.selected == "none": # kein Limit konfiguriert, mit konfigurierter Stromstärke laden + return current, "time_charging", message, plan.name + elif plan.limit.selected == "soc": # SoC Limit konfiguriert + if soc: + if soc < plan.limit.soc: + return current, "time_charging", message, plan.name # Limit nicht erreicht + else: + return 0, "stop", self.TIME_CHARGING_SOC_REACHED, plan.name # Limit erreicht + else: + return plan.current, "time_charging", message, plan.name + elif plan.limit.selected == "amount": # Energiemengenlimit konfiguriert + if used_amount_time_charging < plan.limit.amount: + return current, "time_charging", message, plan.name # Limit nicht erreicht + else: + return 0, "stop", self.TIME_CHARGING_AMOUNT_REACHED, plan.name # Limit erreicht + else: + raise TypeError(f'{plan.limit.selected} unbekanntes Zeitladen-Limit.') + else: + message = self.TIME_CHARGING_NO_PLAN_ACTIVE + else: + message = self.TIME_CHARGING_NO_PLAN_CONFIGURED + log.debug(message) + return 0, "stop", message, None + except Exception: + log.exception("Fehler im ev-Modul "+str(self.ct_num)) + return 0, "stop", "Keine Ladung, da da ein interner Fehler aufgetreten ist: "+traceback.format_exc(), None + + INSTANT_CHARGING_SOC_REACHED = "Kein Sofortladen, da der Soc bereits erreicht wurde." + INSTANT_CHARGING_AMOUNT_REACHED = "Kein Sofortladen, da die Energiemenge bereits geladen wurde." + + def instant_charging(self, + soc: Optional[float], + imported_instant_charging: float, + charging_type: str) -> Tuple[int, str, Optional[str]]: + """ prüft, ob die Lademengenbegrenzung erreicht wurde und setzt entsprechend den Ladestrom. + """ + message = None + try: + instant_charging = self.data.chargemode.instant_charging + if charging_type == ChargingType.AC.value: + current = instant_charging.current + else: + current = instant_charging.dc_current + if self.data.et.active and data.data.optional_data.et_provider_available(): + if not data.data.optional_data.et_price_lower_than_limit(self.data.et.max_price): + return 0, "stop", self.CHARGING_PRICE_EXCEEDED + if instant_charging.limit.selected == "none": + return current, "instant_charging", message + elif instant_charging.limit.selected == "soc": + if soc: + if soc < instant_charging.limit.soc: + return current, "instant_charging", message + else: + return 0, "stop", self.INSTANT_CHARGING_SOC_REACHED + else: + return current, "instant_charging", message + elif instant_charging.limit.selected == "amount": + if imported_instant_charging < self.data.chargemode.instant_charging.limit.amount: + return current, "instant_charging", message + else: + return 0, "stop", self.INSTANT_CHARGING_AMOUNT_REACHED + else: + raise TypeError(f'{instant_charging.limit.selected} unbekanntes Sofortladen-Limit.') + except Exception: + log.exception("Fehler im ev-Modul "+str(self.ct_num)) + return 0, "stop", "Keine Ladung, da da ein interner Fehler aufgetreten ist: "+traceback.format_exc() + + PV_CHARGING_SOC_REACHED = "Keine Ladung, da der maximale Soc bereits erreicht wurde." + PV_CHARGING_SOC_CHARGING = ("Ladung evtl. auch ohne PV-Überschuss, da der Mindest-SoC des Fahrzeugs noch nicht " + "erreicht wurde.") + PV_CHARGING_MIN_CURRENT_CHARGING = "Ladung evtl. auch ohne PV-Überschuss, da minimaler Dauerstrom aktiv ist." + + def pv_charging(self, soc: Optional[float], min_current: int, charging_type: str) -> Tuple[int, str, Optional[str]]: + """ prüft, ob Min-oder Max-Soc erreicht wurden und setzt entsprechend den Ladestrom. + """ + message = None + try: + pv_charging = self.data.chargemode.pv_charging + if soc is None or soc < pv_charging.max_soc: + if pv_charging.min_soc != 0 and soc is not None: + if soc < pv_charging.min_soc: + if charging_type == ChargingType.AC.value: + current = pv_charging.min_soc_current + else: + current = pv_charging.dc_min_soc_current + return current, "instant_charging", self.PV_CHARGING_SOC_CHARGING + if charging_type == ChargingType.AC.value: + pv_min_current = pv_charging.min_current + else: + pv_min_current = pv_charging.dc_min_current + if pv_min_current == 0: + # nur PV; Ampere darf nicht 0 sein, wenn geladen werden soll + return min_current, "pv_charging", message + else: + # Min PV + return pv_min_current, "instant_charging", self.PV_CHARGING_MIN_CURRENT_CHARGING + else: + return 0, "stop", self.PV_CHARGING_SOC_REACHED + except Exception: + log.exception("Fehler im ev-Modul "+str(self.ct_num)) + return 0, "stop", "Keine Ladung, da ein interner Fehler aufgetreten ist: "+traceback.format_exc() + + def scheduled_charging_recent_plan(self, + soc: float, + ev_template: EvTemplate, + phases: int, + used_amount: float, + max_phases: int, + phase_switch_supported: bool, + charging_type: str) -> Tuple[Optional[SelectedPlan], float]: + """ prüft, ob der Ziel-SoC oder die Ziel-Energiemenge erreicht wurde und stellt den zur Erreichung nötigen + Ladestrom ein. Um etwas mehr Puffer zu haben, wird bis 20 Min nach dem Zieltermin noch geladen, wenn dieser + nicht eingehalten werden konnte. + """ + if phase_switch_supported: + if charging_type == ChargingType.AC.value: + max_current = ev_template.data.max_current_multi_phases + else: + max_current = ev_template.data.dc_max_current + instant_phases = data.data.general_data.get_phases_chargemode("scheduled_charging", "instant_charging") + if instant_phases == 0: + planned_phases = 3 + else: + planned_phases = instant_phases + planned_phases = min(planned_phases, max_phases) + plan_data = self._search_plan(max_current, soc, ev_template, planned_phases, used_amount, charging_type) + if (plan_data and + charging_type == ChargingType.AC.value and + instant_phases == 0 and + plan_data.remaining_time > 300 and + self.data.et.active is False): + max_current = ev_template.data.max_current_single_phase + plan_data_single_phase = self._search_plan( + max_current, soc, ev_template, 1, used_amount, charging_type) + if plan_data_single_phase: + if plan_data_single_phase.remaining_time > 300: + plan_data = plan_data_single_phase + else: + if charging_type == ChargingType.AC.value: + if phases == 1: + max_current = ev_template.data.max_current_single_phase + else: + max_current = ev_template.data.max_current_multi_phases + else: + max_current = ev_template.data.dc_max_current + plan_data = self._search_plan(max_current, soc, ev_template, phases, used_amount, charging_type) + return plan_data + + def _search_plan(self, + max_current: int, + soc: Optional[float], + ev_template: EvTemplate, + phases: int, + used_amount: float, + charging_type: str) -> Optional[SelectedPlan]: + smallest_remaining_time = float("inf") + missed_date_today_of_plan_with_smallest_remaining_time = False + plan_data: Optional[SelectedPlan] = None + battery_capacity = ev_template.data.battery_capacity + for num, plan in self.data.chargemode.scheduled_charging.plans.items(): + if plan.active: + if plan.limit.selected == "soc" and soc is None: + raise ValueError("Um Zielladen mit SoC-Ziel nutzen zu können, bitte ein SoC-Modul konfigurieren " + f"oder im Plan {plan.name} als Begrenzung Energie einstellen.") + try: + duration, missing_amount = self._calculate_duration( + plan, soc, battery_capacity, used_amount, phases, charging_type, ev_template) + remaining_time, missed_date_today = timecheck.check_duration(plan, duration, self.BUFFER) + if remaining_time: + # Wenn der Zeitpunkt vorüber, aber noch nicht abgelaufen ist oder + # wenn noch gar kein Plan vorhanden ist, + if ((remaining_time < 0 and missed_date_today is False) or + # oder der Zeitpunkt noch nicht vorüber ist + remaining_time > 0): + # Wenn die verbleibende Zeit geringer als die niedrigste bisherige verbleibende Zeit ist + if (remaining_time < smallest_remaining_time or + # oder wenn der Zeitpunkt abgelaufen ist und es noch einen Zeitpunkt gibt, der in + # der Zukunft liegt. + (missed_date_today_of_plan_with_smallest_remaining_time and 0 < remaining_time)): + smallest_remaining_time = remaining_time + missed_date_today_of_plan_with_smallest_remaining_time = missed_date_today + if charging_type == ChargingType.AC.value: + available_current = plan.current + else: + available_current = plan.dc_current + plan_data = SelectedPlan( + remaining_time=remaining_time, + available_current=available_current, + max_current=max_current, + phases=phases, + num=num, + missing_amount=missing_amount, + duration=duration) + log.debug(f"Plan-Nr. {num}: Differenz zum Start {remaining_time}s, Dauer {duration/3600}h, " + f"Termin heute verpasst: {missed_date_today}") + except Exception: + log.exception("Fehler im ev-Modul "+str(self.ct_num)) + return plan_data + + def _calculate_duration(self, + plan: ScheduledChargingPlan, + soc: Optional[float], + battery_capacity: float, + used_amount: float, + phases: int, + charging_type: str, + ev_template: EvTemplate) -> Tuple[float, float]: + if plan.limit.selected == "soc": + if soc is not None: + missing_amount = ((plan.limit.soc_scheduled - soc) / 100) * battery_capacity + else: + raise ValueError("Um Zielladen mit SoC-Ziel nutzen zu können, bitte ein SoC-Modul konfigurieren.") + else: + missing_amount = plan.limit.amount - used_amount + current = plan.current if charging_type == ChargingType.AC.value else plan.dc_current + current = max(current, ev_template.data.min_current if charging_type == + ChargingType.AC.value else ev_template.data.dc_min_current) + duration = missing_amount/(current * phases*230) * 3600 + return duration, missing_amount + + SCHEDULED_REACHED_LIMIT_SOC = ("Kein Zielladen, da noch Zeit bis zum Zieltermin ist. " + "Kein Zielladen mit Überschuss, da das SoC-Limit für Überschuss-Laden " + + "erreicht wurde.") + SCHEDULED_CHARGING_REACHED_LIMIT_SOC = ("Kein Zielladen, da das Limit für Fahrzeug Laden mit Überschuss (SoC-Limit)" + " sowie der Fahrzeug-SoC (Ziel-SoC) bereits erreicht wurde.") + SCHEDULED_CHARGING_REACHED_AMOUNT = "Kein Zielladen, da die Energiemenge bereits erreicht wurde." + SCHEDULED_CHARGING_REACHED_SCHEDULED_SOC = ("Falls vorhanden wird mit EVU-Überschuss geladen, da der Ziel-Soc " + "für Zielladen bereits erreicht wurde.") + SCHEDULED_CHARGING_NO_PLANS_CONFIGURED = "Keine Ladung, da keine Ziel-Termine konfiguriert sind." + SCHEDULED_CHARGING_NO_DATE_PENDING = "Kein Zielladen, da kein Ziel-Termin in den nächsten 24 Stunden ansteht." + SCHEDULED_CHARGING_USE_PV = ("Kein Zielladen, da noch Zeit bis zum Zieltermin ist. Falls vorhanden, " + "wird mit Überschuss geladen.") + SCHEDULED_CHARGING_MAX_CURRENT = ("Zielladen mit {}A. Der Ladestrom wurde erhöht, um den Zieltermin zu erreichen. " + "Es wird bis max. 20 Minuten nach dem angegebenen Zieltermin geladen.") + SCHEDULED_CHARGING_LIMITED_BY_SOC = 'einen SoC von {}%' + SCHEDULED_CHARGING_LIMITED_BY_AMOUNT = '{}kWh geladene Energie' + SCHEDULED_CHARGING_IN_TIME = ('Zielladen mit mindestens {}A, um {} um {} zu erreichen. Falls vorhanden wird ' + 'zusätzlich EVU-Überschuss geladen.') + SCHEDULED_CHARGING_CHEAP_HOUR = "Zielladen, da ein günstiger Zeitpunkt zum preisbasierten Laden ist." + SCHEDULED_CHARGING_EXPENSIVE_HOUR = ("Zielladen ausstehend, da jetzt kein günstiger Zeitpunkt zum preisbasierten " + "Laden ist. Falls vorhanden, wird mit Überschuss geladen.") + + def scheduled_charging_calc_current(self, + plan_data: Optional[SelectedPlan], + soc: int, + used_amount: float, + control_parameter_phases: int, + min_current: int, + soc_request_interval_offset: int) -> Tuple[float, str, str, int]: + current = 0 + submode = "stop" + if plan_data is None: + if len(self.data.chargemode.scheduled_charging.plans) == 0: + return current, submode, self.SCHEDULED_CHARGING_NO_PLANS_CONFIGURED, control_parameter_phases + else: + return current, submode, self.SCHEDULED_CHARGING_NO_DATE_PENDING, control_parameter_phases + current_plan = self.data.chargemode.scheduled_charging.plans[plan_data.num] + limit = current_plan.limit + phases = plan_data.phases + log.debug("Verwendeter Plan: "+str(current_plan.name)) + if limit.selected == "soc" and soc >= limit.soc_limit and soc >= limit.soc_scheduled: + message = self.SCHEDULED_CHARGING_REACHED_LIMIT_SOC + elif limit.selected == "soc" and limit.soc_scheduled <= soc < limit.soc_limit: + message = self.SCHEDULED_CHARGING_REACHED_SCHEDULED_SOC + current = min_current + submode = "pv_charging" + # bei Überschuss-Laden mit der Phasenzahl aus den control_parameter laden, + # um die Umschaltung zu berücksichtigen. + phases = control_parameter_phases + elif limit.selected == "amount" and used_amount >= limit.amount: + message = self.SCHEDULED_CHARGING_REACHED_AMOUNT + elif 0 - soc_request_interval_offset < plan_data.remaining_time < 300 + soc_request_interval_offset: + # 5 Min vor spätestem Ladestart + if limit.selected == "soc": + limit_string = self.SCHEDULED_CHARGING_LIMITED_BY_SOC.format(limit.soc_scheduled) + else: + limit_string = self.SCHEDULED_CHARGING_LIMITED_BY_AMOUNT.format(limit.amount/1000) + message = self.SCHEDULED_CHARGING_IN_TIME.format( + plan_data.available_current, limit_string, current_plan.time) + current = plan_data.available_current + submode = "instant_charging" + # weniger als die berechnete Zeit verfügbar + # Ladestart wurde um maximal 20 Min verpasst. + elif plan_data.remaining_time <= 0 - soc_request_interval_offset: + if plan_data.duration + plan_data.remaining_time < 0: + current = plan_data.max_current + else: + current = min(plan_data.missing_amount/((plan_data.duration + plan_data.remaining_time) / + 3600)/(phases*230), plan_data.max_current) + message = self.SCHEDULED_CHARGING_MAX_CURRENT.format(round(current, 2)) + submode = "instant_charging" + else: + # Wenn dynamische Tarife aktiv sind, prüfen, ob jetzt ein günstiger Zeitpunkt zum Laden + # ist. + if self.data.et.active and data.data.optional_data.et_provider_available(): + hour_list = data.data.optional_data.et_get_loading_hours(plan_data.duration, plan_data.remaining_time) + log.debug(f"Günstige Ladezeiten: {hour_list}") + if timecheck.is_list_valid(hour_list): + message = self.SCHEDULED_CHARGING_CHEAP_HOUR + current = plan_data.available_current + submode = "instant_charging" + elif ((limit.selected == "soc" and soc <= limit.soc_limit) or + (limit.selected == "amount" and used_amount < limit.amount)): + message = self.SCHEDULED_CHARGING_EXPENSIVE_HOUR + current = min_current + submode = "pv_charging" + phases = control_parameter_phases + else: + message = self.SCHEDULED_REACHED_LIMIT_SOC + else: + # Wenn SoC-Limit erreicht wurde, soll nicht mehr mit Überschuss geladen werden + if limit.selected == "soc" and soc >= limit.soc_limit: + message = self.SCHEDULED_REACHED_LIMIT_SOC + else: + message = self.SCHEDULED_CHARGING_USE_PV + current = min_current + submode = "pv_charging" + phases = control_parameter_phases + return current, submode, message, phases + + def standby(self) -> Tuple[int, str, str]: + return 0, "standby", "Keine Ladung, da der Lademodus Standby aktiv ist." + + def stop(self) -> Tuple[int, str, str]: + return 0, "stop", "Keine Ladung, da der Lademodus Stop aktiv ist." diff --git a/packages/control/ev_charge_template_test.py b/packages/control/ev/charge_template_test.py similarity index 96% rename from packages/control/ev_charge_template_test.py rename to packages/control/ev/charge_template_test.py index ecf5a5dd0d..6425eada6b 100644 --- a/packages/control/ev_charge_template_test.py +++ b/packages/control/ev/charge_template_test.py @@ -5,8 +5,10 @@ from control import data from control import optional +from control.ev.charge_template import SelectedPlan from control.chargepoint.charging_type import ChargingType -from control.ev import ChargeTemplate, EvTemplate, EvTemplateData, SelectedPlan +from control.ev.ev import ChargeTemplate +from control.ev.ev_template import EvTemplate, EvTemplateData from control.general import General from helpermodules import timecheck from helpermodules.abstract_plans import Limit, ScheduledChargingPlan, TimeChargingPlan @@ -149,7 +151,7 @@ def test_scheduled_charging_recent_plan(params: Params, monkeypatch): get_phases_chargemode_mock = Mock(return_value=params.chargemode_phases) monkeypatch.setattr(data.data.general_data, "get_phases_chargemode", get_phases_chargemode_mock) search_plan_mock = Mock(return_value=params.search_plan) - monkeypatch.setattr(ChargeTemplate, "search_plan", search_plan_mock) + monkeypatch.setattr(ChargeTemplate, "_search_plan", search_plan_mock) evt_data = Mock(spec=EvTemplateData, max_current_multi_phases=16, max_current_single_phase=32) evt = Mock(spec=EvTemplate, data=evt_data) @@ -174,7 +176,7 @@ def test_calculate_duration(selected: str, phases: int, expected_duration: float plan = ScheduledChargingPlan() plan.limit.selected = selected # execution - duration, missing_amount = ct.calculate_duration(plan, 60, 45000, 200, phases, ChargingType.AC.value, EvTemplate()) + duration, missing_amount = ct._calculate_duration(plan, 60, 45000, 200, phases, ChargingType.AC.value, EvTemplate()) # evaluation assert duration == expected_duration @@ -196,14 +198,14 @@ def test_search_plan(check_duration_return1: Tuple[Optional[float], bool], monkeypatch): # setup calculate_duration_mock = Mock(return_value=(100, 200)) - monkeypatch.setattr(ChargeTemplate, "calculate_duration", calculate_duration_mock) + monkeypatch.setattr(ChargeTemplate, "_calculate_duration", calculate_duration_mock) check_duration_mock = Mock(side_effect=[check_duration_return1, check_duration_return2]) monkeypatch.setattr(timecheck, "check_duration", check_duration_mock) ct = ChargeTemplate(0) plan_mock = Mock(spec=ScheduledChargingPlan, active=True, current=14, limit=Limit(selected="amount")) ct.data.chargemode.scheduled_charging.plans = {0: plan_mock, 1: plan_mock} # execution - plan_data = ct.search_plan(14, 60, EvTemplate(), 3, 200, ChargingType.AC.value) + plan_data = ct._search_plan(14, 60, EvTemplate(), 3, 200, ChargingType.AC.value) # evaluation if expected_plan_num is None: diff --git a/packages/control/ev/ev.py b/packages/control/ev/ev.py new file mode 100644 index 0000000000..2930d47669 --- /dev/null +++ b/packages/control/ev/ev.py @@ -0,0 +1,469 @@ +""" EV-Logik +ermittelt, den Ladestrom, den das EV gerne zur Verfügung hätte. + +In den control Parametern wird sich der Lademodus, Submodus, Priorität, Phasen und Stromstärke gemerkt, +mit denen das EV aktuell in der Regelung berücksichtigt wird. Bei der Ermittlung der benötigten Strom- +stärke wird auch geprüft, ob sich an diesen Parametern etwas geändert hat. Falls ja, muss das EV +in der Regelung neu priorisiert werden und eine neue Zuteilung des Stroms erhalten. +""" +from dataclasses import dataclass, field +import logging +from typing import List, Optional, Tuple + +from control import data +from control.chargepoint.chargepoint_state import ChargepointState, PHASE_SWITCH_STATES +from control.chargepoint.charging_type import ChargingType +from control.chargepoint.control_parameter import ControlParameter +from control.ev.charge_template import ChargeTemplate +from control.ev.ev_template import EvTemplate +from control.limiting_value import LimitingValue +from dataclass_utils.factories import empty_list_factory +from helpermodules import timecheck +from helpermodules.constants import NO_ERROR +from modules.common.abstract_vehicle import VehicleUpdateData +from modules.common.configurable_vehicle import ConfigurableVehicle + +log = logging.getLogger(__name__) + + +def get_vehicle_default() -> dict: + return { + "charge_template": 0, + "ev_template": 0, + "name": "Fahrzeug", + "info": { + "manufacturer": None, + "model": None, + }, + "tag_id": [], + "get/soc": 0 + } + + +@dataclass +class Set: + soc_error_counter: int = field( + default=0, metadata={"topic": "set/soc_error_counter"}) + + +def set_factory() -> Set: + return Set() + + +@dataclass +class Get: + soc: Optional[int] = field(default=None, metadata={"topic": "get/soc"}) + soc_request_timestamp: Optional[float] = field( + default=None, metadata={"topic": "get/soc_request_timestamp"}) + soc_timestamp: Optional[float] = field( + default=None, metadata={"topic": "get/soc_timestamp"}) + force_soc_update: bool = field(default=False, metadata={ + "topic": "get/force_soc_update"}) + range: Optional[float] = field(default=None, metadata={"topic": "get/range"}) + fault_state: int = field(default=0, metadata={"topic": "get/fault_state"}) + fault_str: str = field(default=NO_ERROR, metadata={"topic": "get/fault_str"}) + + +def get_factory() -> Get: + return Get() + + +@dataclass +class EvData: + set: Set = field(default_factory=set_factory) + charge_template: int = field(default=0, metadata={"topic": "charge_template"}) + ev_template: int = field(default=0, metadata={"topic": "ev_template"}) + name: str = field(default="neues Fahrzeug", metadata={"topic": "name"}) + tag_id: List[str] = field(default_factory=empty_list_factory, metadata={ + "topic": "tag_id"}) + get: Get = field(default_factory=get_factory) + + +class Ev: + """Logik des EV + """ + + def __init__(self, index: int): + try: + self.ev_template: EvTemplate = EvTemplate() + self.charge_template: ChargeTemplate = ChargeTemplate(0) + self.soc_module: ConfigurableVehicle = None + self.chargemode_changed = False + self.submode_changed = False + self.num = index + self.data = EvData() + except Exception: + log.exception("Fehler im ev-Modul "+str(self.num)) + + def soc_interval_expired(self, vehicle_update_data: VehicleUpdateData) -> bool: + request_soc = False + if self.data.get.soc_request_timestamp is None: + # Initiale Abfrage + request_soc = True + else: + if vehicle_update_data.plug_state is True or self.soc_module.general_config.request_only_plugged is False: + if (vehicle_update_data.charge_state is True or + (self.data.set.soc_error_counter < 3 and self.data.get.fault_state == 2)): + interval = self.soc_module.general_config.request_interval_charging + else: + interval = self.soc_module.general_config.request_interval_not_charging + # Zeitstempel prüfen, ob wieder abgefragt werden muss. + if timecheck.check_timestamp(self.data.get.soc_request_timestamp, interval-5) is False: + # Zeit ist abgelaufen + request_soc = True + return request_soc + + def get_required_current(self, + control_parameter: ControlParameter, + imported: float, + max_phases_hw: int, + phase_switch_supported: bool, + charging_type: str) -> Tuple[bool, Optional[str], str, float, int]: + """ ermittelt, ob und mit welchem Strom das EV geladen werden soll (unabhängig vom Lastmanagement) + + Parameter + --------- + imported_since_mode_switch: float + seit dem letzten Lademodi-Wechsel geladene Energie. + Return + ------ + state: bool + Soll geladen werden? + message: str + Nachricht, warum nicht geladen werden soll + submode: str + Lademodus, in dem tatsächlich geladen wird + required_current: int + Strom, der nach Ladekonfiguration benötigt wird + """ + phases = None + required_current = None + submode = None + message = None + state = True + try: + if self.charge_template.data.chargemode.selected == "scheduled_charging": + if control_parameter.imported_at_plan_start is None: + control_parameter.imported_at_plan_start = imported + used_amount = imported - control_parameter.imported_at_plan_start + plan_data = self.charge_template.scheduled_charging_recent_plan( + self.data.get.soc, + self.ev_template, + control_parameter.phases, + used_amount, + max_phases_hw, + phase_switch_supported, + charging_type) + soc_request_interval_offset = 0 + if plan_data: + name = self.charge_template.data.chargemode.scheduled_charging.plans[plan_data.num].name + # Wenn mit einem neuen Plan geladen wird, muss auch die Energiemenge von neuem gezählt werden. + if (self.charge_template.data.chargemode.scheduled_charging.plans[plan_data.num].limit. + selected == "amount" and + name != control_parameter.current_plan): + control_parameter.imported_at_plan_start = imported + # Wenn der SoC ein paar Minuten alt ist, kann der Termin trotzdem gehalten werden. + # Zielladen kann nicht genauer arbeiten, als das Abfrageintervall vom SoC. + if (self.soc_module and + self.charge_template.data.chargemode. + scheduled_charging.plans[plan_data.num].limit.selected == "soc"): + soc_request_interval_offset = self.soc_module.general_config.request_interval_charging + control_parameter.current_plan = name + else: + control_parameter.current_plan = None + required_current, submode, message, phases = self.charge_template.scheduled_charging_calc_current( + plan_data, + self.data.get.soc, + used_amount, + control_parameter.phases, + control_parameter.min_current, + soc_request_interval_offset) + + # Wenn Zielladen auf Überschuss wartet, prüfen, ob Zeitladen aktiv ist. + if (submode != "instant_charging" and + self.charge_template.data.time_charging.active): + if control_parameter.imported_at_plan_start is None: + control_parameter.imported_at_plan_start = imported + used_amount = imported - control_parameter.imported_at_plan_start + tmp_current, tmp_submode, tmp_message, name = self.charge_template.time_charging( + self.data.get.soc, + used_amount, + charging_type + ) + # Info vom Zielladen erhalten + message = f"{message or ''} {tmp_message or ''}".strip() + if tmp_current > 0: + control_parameter.current_plan = name + # Wenn mit einem neuen Plan geladen wird, muss auch die Energiemenge von neuem gezählt werden. + if name != control_parameter.current_plan: + control_parameter.imported_at_plan_start = imported + required_current = tmp_current + submode = tmp_submode + if (required_current == 0) or (required_current is None): + if self.charge_template.data.chargemode.selected == "instant_charging": + # Wenn der Submode auf stop gestellt wird, wird auch die Energiemenge seit Wechsel des Modus + # zurückgesetzt, dann darf nicht die Energiemenge erneute geladen werden. + if control_parameter.imported_instant_charging is None: + control_parameter.imported_instant_charging = imported + used_amount = imported - control_parameter.imported_instant_charging + required_current, submode, message = self.charge_template.instant_charging( + self.data.get.soc, + used_amount, + charging_type) + elif self.charge_template.data.chargemode.selected == "pv_charging": + required_current, submode, message = self.charge_template.pv_charging( + self.data.get.soc, control_parameter.min_current, charging_type) + elif self.charge_template.data.chargemode.selected == "standby": + # Text von Zeit-und Zielladen nicht überschreiben. + if message is None: + required_current, submode, message = self.charge_template.standby() + else: + required_current, submode, _ = self.charge_template.standby() + elif self.charge_template.data.chargemode.selected == "stop": + required_current, submode, message = self.charge_template.stop() + if submode == "stop" or submode == "standby" or (self.charge_template.data.chargemode.selected == "stop"): + state = False + if phases is None: + phases = control_parameter.phases + return state, message, submode, required_current, phases + except Exception as e: + log.exception("Fehler im ev-Modul "+str(self.num)) + return (False, f"Kein Ladevorgang, da ein Fehler aufgetreten ist: {' '.join(e.args)}", "stop", 0, + control_parameter.phases) + + def set_chargemode_changed(self, control_parameter: ControlParameter, submode: str) -> None: + if ((submode == "time_charging" and control_parameter.chargemode != "time_charging") or + (submode != "time_charging" and + control_parameter.chargemode != self.charge_template.data.chargemode.selected)): + self.chargemode_changed = True + log.debug("Änderung des Lademodus") + else: + self.chargemode_changed = False + + def set_submode_changed(self, control_parameter: ControlParameter, submode: str) -> None: + self.submode_changed = (submode != control_parameter.submode) + + def check_min_max_current(self, + control_parameter: ControlParameter, + required_current: float, + phases: int, + charging_type: str, + pv: bool = False,) -> Tuple[float, Optional[str]]: + """ prüft, ob der gesetzte Ladestrom über dem Mindest-Ladestrom und unter dem Maximal-Ladestrom des EVs liegt. + Falls nicht, wird der Ladestrom auf den Mindest-Ladestrom bzw. den Maximal-Ladestrom des EV gesetzt. + Wenn PV-Laden aktiv ist, darf die Stromstärke nicht unter den PV-Mindeststrom gesetzt werden. + """ + msg = None + # Überprüfung bei 0 (automatische Umschaltung) erfolgt nach der Prüfung der Phasenumschaltung, wenn fest + # steht, mit vielen Phasen geladen werden soll. + if phases != 0: + # EV soll/darf nicht laden + if required_current != 0: + if not pv: + if charging_type == ChargingType.AC.value: + min_current = self.ev_template.data.min_current + else: + min_current = self.ev_template.data.dc_min_current + else: + min_current = control_parameter.required_current + if required_current < min_current: + required_current = min_current + msg = ("Die Einstellungen in dem Fahrzeug-Profil beschränken den Strom auf " + f"mindestens {required_current} A.") + else: + if charging_type == ChargingType.AC.value: + if phases == 1: + max_current = self.ev_template.data.max_current_single_phase + else: + max_current = self.ev_template.data.max_current_multi_phases + else: + max_current = self.ev_template.data.dc_max_current + if required_current > max_current: + required_current = max_current + msg = ("Die Einstellungen in dem Fahrzeug-Profil beschränken den Strom auf " + f"maximal {required_current} A.") + return required_current, msg + + CURRENT_OUT_OF_NOMINAL_DIFFERENCE = (", da das Fahrzeug nicht mit der vorgegebenen Stromstärke +/- der erlaubten " + + "Stromabweichung aus dem Fahrzeug-Profil/Minimalen Dauerstrom lädt.") + ENOUGH_POWER = ", da ausreichend Überschuss für mehrphasiges Laden zur Verfügung steht." + NOT_ENOUGH_POWER = ", da nicht ausreichend Überschuss für mehrphasiges Laden zur Verfügung steht." + + def _check_phase_switch_conditions(self, + control_parameter: ControlParameter, + get_currents: List[float], + get_power: float, + max_current_cp: int, + limit: LimitingValue) -> Tuple[bool, Optional[str]]: + # Manche EV laden mit 6.1A bei 6A Soll-Strom + min_current = (max(control_parameter.min_current, control_parameter.required_current) + + self.ev_template.data.nominal_difference) + max_current = (min(self.ev_template.data.max_current_single_phase, max_current_cp) + - self.ev_template.data.nominal_difference) + phases_in_use = control_parameter.phases + pv_config = data.data.general_data.data.chargemode_config.pv_charging + max_phases_ev = self.ev_template.data.max_phases + if self.charge_template.data.chargemode.pv_charging.feed_in_limit: + feed_in_yield = pv_config.feed_in_yield + else: + feed_in_yield = 0 + all_surplus = data.data.counter_all_data.get_evu_counter().get_usable_surplus(feed_in_yield) + required_surplus = control_parameter.min_current * max_phases_ev * 230 - get_power + condition_1_to_3 = (((max(get_currents) > max_current and + all_surplus > required_surplus) or limit == LimitingValue.UNBALANCED_LOAD.value) and + phases_in_use == 1) + condition_3_to_1 = max(get_currents) < min_current and all_surplus <= 0 and phases_in_use > 1 + if condition_1_to_3 or condition_3_to_1: + return True, None + else: + if phases_in_use > 1 and all_surplus > 0: + return False, self.ENOUGH_POWER + elif phases_in_use == 1 and all_surplus < required_surplus: + return False, self.NOT_ENOUGH_POWER + else: + return False, self.CURRENT_OUT_OF_NOMINAL_DIFFERENCE + + PHASE_SWITCH_DELAY_TEXT = '{} Phasen in {}.' + + def auto_phase_switch(self, + control_parameter: ControlParameter, + cp_num: int, + get_currents: List[float], + get_power: float, + max_current_cp: int, + max_phases: int, + limit: LimitingValue) -> Tuple[int, float, Optional[str]]: + message = None + current = control_parameter.required_current + timestamp_auto_phase_switch = control_parameter.timestamp_auto_phase_switch + phases_to_use = control_parameter.phases + phases_in_use = control_parameter.phases + pv_config = data.data.general_data.data.chargemode_config.pv_charging + cm_config = data.data.general_data.data.chargemode_config + if self.charge_template.data.chargemode.pv_charging.feed_in_limit: + feed_in_yield = pv_config.feed_in_yield + else: + feed_in_yield = 0 + all_surplus = data.data.counter_all_data.get_evu_counter().get_usable_surplus(feed_in_yield) + if phases_in_use == 1: + direction_str = f"Umschaltung von 1 auf {max_phases}" + delay = cm_config.phase_switch_delay * 60 + required_reserved_power = (control_parameter.min_current * max_phases * 230 - + self.ev_template.data.max_current_single_phase * 230) + + new_phase = max_phases + new_current = control_parameter.min_current + else: + direction_str = f"Umschaltung von {max_phases} auf 1" + delay = (16 - cm_config.phase_switch_delay) * 60 + # Es kann einphasig mit entsprechend niedriger Leistung gestartet werden. + required_reserved_power = 0 + new_phase = 1 + new_current = self.ev_template.data.max_current_single_phase + + log.debug( + f'Genutzter Strom: {max(get_currents)}A, Überschuss: {all_surplus}W, benötigte neue Leistung: ' + f'{required_reserved_power}W') + # Wenn gerade umgeschaltet wird, darf kein Timer gestartet werden. + if not self.ev_template.data.prevent_phase_switch: + condition, condition_msg = self._check_phase_switch_conditions(control_parameter, + get_currents, + get_power, + max_current_cp, + limit) + if control_parameter.state not in PHASE_SWITCH_STATES: + if condition: + # Umschaltverzögerung starten + timestamp_auto_phase_switch = timecheck.create_timestamp() + # Wenn nach der Umschaltung weniger Leistung benötigt wird, soll während der Verzögerung keine + # neuen eingeschaltet werden. + data.data.counter_all_data.get_evu_counter( + ).data.set.reserved_surplus += max(0, required_reserved_power) + message = self.PHASE_SWITCH_DELAY_TEXT.format( + direction_str, + timecheck.convert_timestamp_delta_to_time_string(timestamp_auto_phase_switch, delay)) + control_parameter.state = ChargepointState.PHASE_SWITCH_DELAY + elif condition_msg: + log.debug(f"Keine Phasenumschaltung{condition_msg}") + else: + if condition: + # Timer laufen lassen + if timecheck.check_timestamp(timestamp_auto_phase_switch, delay): + message = self.PHASE_SWITCH_DELAY_TEXT.format( + direction_str, + timecheck.convert_timestamp_delta_to_time_string(timestamp_auto_phase_switch, delay)) + else: + timestamp_auto_phase_switch = None + data.data.counter_all_data.get_evu_counter( + ).data.set.reserved_surplus -= max(0, required_reserved_power) + phases_to_use = new_phase + current = new_current + log.debug("Phasenumschaltung kann nun durchgeführt werden.") + control_parameter.state = ChargepointState.PHASE_SWITCH_AWAITED + else: + timestamp_auto_phase_switch = None + data.data.counter_all_data.get_evu_counter( + ).data.set.reserved_surplus -= max(0, required_reserved_power) + message = f"Verzögerung für die {direction_str} Phasen abgebrochen{condition_msg}" + control_parameter.state = ChargepointState.CHARGING_ALLOWED + + if message: + log.info(f"LP {cp_num}: {message}") + if timestamp_auto_phase_switch != control_parameter.timestamp_auto_phase_switch: + control_parameter.timestamp_auto_phase_switch = timestamp_auto_phase_switch + return phases_to_use, current, message + + def reset_phase_switch(self, control_parameter: ControlParameter): + """ Zurücksetzen der Zeitstempel und reservierten Leistung. + + Die Phasenumschaltung kann nicht abgebrochen werden! + """ + if control_parameter.timestamp_auto_phase_switch is not None: + control_parameter.timestamp_auto_phase_switch = None + # Wenn der Timer läuft, ist den Control-Parametern die alte Phasenzahl hinterlegt. + if control_parameter.phases == 1: + reserved = control_parameter.required_current * \ + 3 * 230 - self.ev_template.data.max_current_single_phase * 230 + data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus -= reserved + log.debug( + "Zurücksetzen der reservierten Leistung für die Phasenumschaltung. reservierte Leistung: " + + str(data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus)) + else: + reserved = self.ev_template.data.max_current_single_phase * \ + 230 - control_parameter.required_current * 3 * 230 + data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus -= reserved + log.debug( + "Zurücksetzen der reservierten Leistung für die Phasenumschaltung. reservierte Leistung: " + + str(data.data.counter_all_data.get_evu_counter().data.set.reserved_surplus)) + control_parameter.state = ChargepointState.CHARGING_ALLOWED + + +def get_ev_to_rfid(rfid: str, vehicle_id: Optional[str] = None) -> Optional[int]: + """ ermittelt zum übergebenen ID-Tag das Fahrzeug + + Parameter + --------- + rfid: string + ID-Tag + vehicle_id: string + MAC-Adresse des ID-Tags (nur openWB Pro) + + Return + ------ + vehicle: int + Nummer des EV, das zum Tag gehört + """ + for vehicle in data.data.ev_data: + try: + if "ev" in vehicle: + if vehicle_id is not None and vehicle_id in data.data.ev_data[vehicle].data.tag_id: + log.debug(f"MAC {vehicle_id} wird EV {data.data.ev_data[vehicle].num} zugeordnet.") + return data.data.ev_data[vehicle].num + if rfid in data.data.ev_data[vehicle].data.tag_id: + log.debug(f"RFID {rfid} wird EV {data.data.ev_data[vehicle].num} zugeordnet.") + return data.data.ev_data[vehicle].num + except Exception: + log.exception("Fehler im ev-Modul "+vehicle) + return None + else: + return None diff --git a/packages/control/ev/ev_template.py b/packages/control/ev/ev_template.py new file mode 100644 index 0000000000..8da234a1ff --- /dev/null +++ b/packages/control/ev/ev_template.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass, field + + +@dataclass +class EvTemplateData: + dc_min_current: int = 0 + dc_max_current: int = 0 + name: str = "Fahrzeug-Profil" + max_current_multi_phases: int = 16 + max_phases: int = 3 + phase_switch_pause: int = 2 + prevent_phase_switch: bool = False + prevent_charge_stop: bool = False + control_pilot_interruption: bool = False + control_pilot_interruption_duration: int = 4 + average_consump: float = 17000 + min_current: int = 6 + max_current_single_phase: int = 16 + battery_capacity: float = 82000 + efficiency: float = 90 + nominal_difference: float = 1 + keep_charge_active_duration: int = 40 + + +def ev_template_data_factory() -> EvTemplateData: + return EvTemplateData() + + +@dataclass +class EvTemplate: + """ Klasse mit den EV-Daten + """ + + data: EvTemplateData = field(default_factory=ev_template_data_factory, metadata={ + "topic": "config"}) + et_num: int = 0 diff --git a/packages/control/ev_test.py b/packages/control/ev/ev_test.py similarity index 98% rename from packages/control/ev_test.py rename to packages/control/ev/ev_test.py index bcc2152042..638d08dec5 100644 --- a/packages/control/ev_test.py +++ b/packages/control/ev/ev_test.py @@ -3,7 +3,7 @@ import pytest -from control.ev import Ev +from control.ev.ev import Ev from helpermodules import timecheck from modules.common.abstract_vehicle import VehicleUpdateData from modules.vehicles.mqtt.config import MqttSocSetup diff --git a/packages/control/ocpp_test.py b/packages/control/ocpp_test.py index d9e3efa51f..858010eca9 100644 --- a/packages/control/ocpp_test.py +++ b/packages/control/ocpp_test.py @@ -5,7 +5,7 @@ from control.chargepoint.chargepoint import Chargepoint from control.chargepoint.chargepoint_template import CpTemplate from control.counter import Counter -from control.ev import Ev +from control.ev.ev import Ev from modules.chargepoints.mqtt.chargepoint_module import ChargepointModule from modules.chargepoints.mqtt.config import Mqtt diff --git a/packages/control/phase_switch.py b/packages/control/phase_switch.py index 4b4df1a51e..40537e7e08 100644 --- a/packages/control/phase_switch.py +++ b/packages/control/phase_switch.py @@ -4,7 +4,7 @@ import threading import time -from control.ev import Ev +from control.ev.ev import Ev from helpermodules.utils._thread_handler import is_thread_alive, thread_handler from modules.common.abstract_chargepoint import AbstractChargepoint diff --git a/packages/helpermodules/command.py b/packages/helpermodules/command.py index 41de1fc776..3f6658d447 100644 --- a/packages/helpermodules/command.py +++ b/packages/helpermodules/command.py @@ -28,7 +28,8 @@ from helpermodules.pub import Pub, pub_single from helpermodules.subdata import SubData from helpermodules.utils.topic_parser import decode_payload -from control import bat, bridge, data, ev, counter, counter_all, pv +from control import bat, bridge, data, counter, counter_all, pv +from control.ev import ev from modules.chargepoints.internal_openwb.chargepoint_module import ChargepointModule from modules.chargepoints.internal_openwb.config import InternalChargepointMode from modules.common.component_type import ComponentType, special_to_general_type_mapping, type_to_topic_mapping diff --git a/packages/helpermodules/data_migration/data_migration.py b/packages/helpermodules/data_migration/data_migration.py index 810b0d0e51..369641a846 100644 --- a/packages/helpermodules/data_migration/data_migration.py +++ b/packages/helpermodules/data_migration/data_migration.py @@ -18,7 +18,8 @@ from threading import Thread from typing import Callable, Dict, List, Optional, Union -from control import data, ev +from control import data +from control.ev import ev from dataclass_utils import dataclass_from_dict import dataclass_utils from helpermodules.data_migration.id_mapping import MapId diff --git a/packages/helpermodules/measurement_logging/update_yields.py b/packages/helpermodules/measurement_logging/update_yields.py index 73961bae15..02ecccbc08 100644 --- a/packages/helpermodules/measurement_logging/update_yields.py +++ b/packages/helpermodules/measurement_logging/update_yields.py @@ -5,12 +5,12 @@ from control import data from control.chargepoint.chargepoint import Chargepoint +from control.ev.ev import Ev from control.pv_all import PvAll +from control.pv import Pv from helpermodules import timecheck from helpermodules.measurement_logging.process_log import get_totals from helpermodules.pub import Pub -from control.ev import Ev -from control.pv import Pv log = logging.getLogger(__name__) diff --git a/packages/helpermodules/subdata.py b/packages/helpermodules/subdata.py index b0ac0450ea..e7f8520fb2 100644 --- a/packages/helpermodules/subdata.py +++ b/packages/helpermodules/subdata.py @@ -9,15 +9,18 @@ import subprocess import paho.mqtt.client as mqtt -from control import bat_all, bat, counter, counter_all, ev, general, optional, pv, pv_all +from control import bat_all, bat, counter, counter_all, general, optional, pv, pv_all from control.chargepoint import chargepoint from control.chargepoint.chargepoint_all import AllChargepoints from control.chargepoint.chargepoint_data import Log from control.chargepoint.chargepoint_state_update import ChargepointStateUpdate from control.chargepoint.chargepoint_template import CpTemplate, CpTemplateData +from control.ev.charge_template import ChargeTemplate, ChargeTemplateData +from control.ev import ev +from control.ev.ev_template import EvTemplate, EvTemplateData from control.optional_data import Ocpp from helpermodules import graph, system -from helpermodules.abstract_plans import AutolockPlan +from helpermodules.abstract_plans import AutolockPlan, ScheduledChargingPlan, TimeChargingPlan from helpermodules.broker import InternalBrokerClient from helpermodules.messaging import MessageType, pub_system_message from helpermodules.utils.run_command import run_command @@ -49,8 +52,8 @@ class SubData: pv_data: Dict[str, pv.Pv] = {} pv_all_data = pv_all.PvAll() ev_data: Dict[str, ev.Ev] = {} - ev_template_data: Dict[str, ev.EvTemplate] = {} - ev_charge_template_data: Dict[str, ev.ChargeTemplate] = {} + ev_template_data: Dict[str, EvTemplate] = {} + ev_charge_template_data: Dict[str, ChargeTemplate] = {} counter_data: Dict[str, counter.Counter] = {} counter_all_data = counter_all.CounterAll() bat_all_data = bat_all.BatAll() @@ -331,7 +334,7 @@ def process_vehicle_charge_template_topic(self, var: Dict[str, ev.ChargeTemplate str(index_second)+" in dem Lade-Profil "+str(index)+" gefunden werden.") else: var["ct"+index].data.chargemode.scheduled_charging.plans[ - index_second] = dataclass_from_dict(ev.ScheduledChargingPlan, decode_payload(msg.payload)) + index_second] = dataclass_from_dict(ScheduledChargingPlan, decode_payload(msg.payload)) self.event_scheduled_charging_plan.set() elif re.search("/vehicle/template/charge_template/[0-9]+/time_charging/plans/[0-9]+$", msg.topic) is not None: @@ -344,20 +347,20 @@ def process_vehicle_charge_template_topic(self, var: Dict[str, ev.ChargeTemplate str(index_second)+" in dem Lade-Profil "+str(index)+" gefunden werden.") else: var["ct"+index].data.time_charging.plans[ - index_second] = dataclass_from_dict(ev.TimeChargingPlan, decode_payload(msg.payload)) + index_second] = dataclass_from_dict(TimeChargingPlan, decode_payload(msg.payload)) self.event_time_charging_plan.set() else: # Pläne unverändert übernehmen scheduled_charging_plans = var["ct" + index].data.chargemode.scheduled_charging.plans time_charging_plans = var["ct" + index].data.time_charging.plans - var["ct" + index].data = dataclass_from_dict(ev.ChargeTemplateData, decode_payload(msg.payload)) + var["ct" + index].data = dataclass_from_dict(ChargeTemplateData, decode_payload(msg.payload)) var["ct"+index].data.time_charging.plans = time_charging_plans var["ct"+index].data.chargemode.scheduled_charging.plans = scheduled_charging_plans self.event_charge_template.set() except Exception: log.exception("Fehler im subdata-Modul") - def process_vehicle_ev_template_topic(self, var: Dict[str, ev.EvTemplate], msg: mqtt.MQTTMessage): + def process_vehicle_ev_template_topic(self, var: Dict[str, EvTemplate], msg: mqtt.MQTTMessage): """ Handler für die EV-Topics Parameter @@ -375,8 +378,8 @@ def process_vehicle_ev_template_topic(self, var: Dict[str, ev.EvTemplate], msg: var.pop("et"+index) else: if "et"+index not in var: - var["et"+index] = ev.EvTemplate(et_num=int(index)) - var["et" + index].data = dataclass_from_dict(ev.EvTemplateData, decode_payload(msg.payload)) + var["et"+index] = EvTemplate(et_num=int(index)) + var["et" + index].data = dataclass_from_dict(EvTemplateData, decode_payload(msg.payload)) self.event_ev_template.set() except Exception: log.exception("Fehler im subdata-Modul") diff --git a/packages/helpermodules/timecheck_test.py b/packages/helpermodules/timecheck_test.py index e8a0ce920e..0f602383ae 100644 --- a/packages/helpermodules/timecheck_test.py +++ b/packages/helpermodules/timecheck_test.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock, Mock import pytest -from control.ev import ChargeTemplate +from control.ev.charge_template import ChargeTemplate from helpermodules import timecheck from helpermodules.abstract_plans import AutolockPlan, Frequency, ScheduledChargingPlan, TimeChargingPlan diff --git a/packages/helpermodules/update_config.py b/packages/helpermodules/update_config.py index 428e4f22f6..a1ff6fbc36 100644 --- a/packages/helpermodules/update_config.py +++ b/packages/helpermodules/update_config.py @@ -8,11 +8,7 @@ import time from typing import List, Optional from paho.mqtt.client import Client as MqttClient, MQTTMessage -from control.bat_all import BatConsiderationMode -from control.chargepoint.charging_type import ChargingType -from control.counter import get_counter_default_config -from control.general import ChargemodeConfig -from control.optional_data import Ocpp + import dataclass_utils from control.chargepoint.chargepoint_template import get_chargepoint_template_default @@ -33,8 +29,14 @@ from helpermodules.utils.run_command import run_command from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index from control import counter_all -from control import ev -from control.general import Prices +from control.bat_all import BatConsiderationMode +from control.chargepoint.charging_type import ChargingType +from control.counter import get_counter_default_config +from control.ev.charge_template import get_charge_template_default +from control.ev import ev +from control.ev.ev_template import EvTemplateData +from control.general import ChargemodeConfig, Prices +from control.optional_data import Ocpp from modules.common.abstract_vehicle import GeneralVehicleConfig from modules.common.component_type import ComponentType from modules.devices.sungrow.sungrow.version import Version @@ -456,9 +458,9 @@ class UpdateConfig: ("openWB/vehicle/0/ev_template", ev.Ev(0).ev_template.et_num), ("openWB/vehicle/0/tag_id", ev.Ev(0).data.tag_id), ("openWB/vehicle/0/get/soc", ev.Ev(0).data.get.soc), - ("openWB/vehicle/template/ev_template/0", asdict(ev.EvTemplateData(name="Standard-Fahrzeug-Profil", - min_current=10))), - ("openWB/vehicle/template/charge_template/0", ev.get_charge_template_default()), + ("openWB/vehicle/template/ev_template/0", asdict(EvTemplateData(name="Standard-Fahrzeug-Profil", + min_current=10))), + ("openWB/vehicle/template/charge_template/0", get_charge_template_default()), ("openWB/general/charge_log_data_config", get_default_charge_log_columns()), ("openWB/general/chargemode_config/instant_charging/phases_to_use", 3), ("openWB/general/chargemode_config/pv_charging/bat_mode", BatConsiderationMode.EV_MODE.value), diff --git a/packages/modules/update_soc.py b/packages/modules/update_soc.py index ce8e87de50..a81071ffb1 100644 --- a/packages/modules/update_soc.py +++ b/packages/modules/update_soc.py @@ -5,7 +5,7 @@ from threading import Event, Thread from control import data -from control.ev import Ev +from control.ev.ev import Ev from helpermodules import subdata from helpermodules import timecheck from helpermodules.constants import NO_ERROR diff --git a/packages/modules/update_soc_test.py b/packages/modules/update_soc_test.py index 640a4b440b..0a3930d620 100644 --- a/packages/modules/update_soc_test.py +++ b/packages/modules/update_soc_test.py @@ -7,9 +7,10 @@ from control.chargepoint.chargepoint import Chargepoint from control.chargepoint.chargepoint_data import Get, Log, Set from control.chargepoint.chargepoint_state_update import ChargepointStateUpdate -from control.ev import Ev, EvData, EvTemplate, EvTemplateData -from control.ev import Get as EvGet -from control.ev import Set as EvSet +from control.ev.ev import Ev, EvData +from control.ev.ev_template import EvTemplate, EvTemplateData +from control.ev.ev import Get as EvGet +from control.ev.ev import Set as EvSet from helpermodules.subdata import SubData from modules.common.abstract_vehicle import GeneralVehicleConfig, VehicleUpdateData from modules.common.configurable_vehicle import ConfigurableVehicle