Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding AC & DC HLC tests to core_tests/startup_test #830

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions tests/core_tests/startup_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import threading
import queue

from enum import Enum

from everest.testing.core_utils.fixtures import *
from everest.testing.core_utils.everest_core import EverestCore, Requirement

Expand All @@ -20,6 +22,10 @@ async def test_000_startup_check(everest_core: EverestCore):
logging.info(">>>>>>>>> test_000_startup_check <<<<<<<<<")
everest_core.start()

class Mode(Enum):
Basic = 0
HlcAc = 1
HlcDc = 2

class ProbeModule:
def __init__(self, session: RuntimeSession):
Expand All @@ -32,6 +38,7 @@ def __init__(self, session: RuntimeSession):
self._handle_evse_manager_event)

self._msg_queue = queue.Queue()
self._energy_wh_import = 0

self._ready_event = threading.Event()
self._mod = m
Expand All @@ -42,8 +49,10 @@ def _ready(self):

def _handle_evse_manager_event(self, args):
self._msg_queue.put(args['event'])
if 'transaction_finished' in args:
self._energy_wh_import = args['transaction_finished']['meter_value']['energy_Wh_import']['total']

def test(self, timeout: float) -> bool:
def test(self, timeout: float, mode: Mode) -> bool:
end_of_time = time.time() + timeout

logging.info("Wating for ready event...")
Expand All @@ -57,9 +66,15 @@ def test(self, timeout: float) -> bool:
# enable simulator
self._mod.call_command(car_sim_ff, 'enable', {'value': True})

cmd = {'value': 'sleep 1;iec_wait_pwr_ready;sleep 1;draw_power_regulated 16,3;sleep 10;unplug'}

if mode == Mode.HlcAc:
cmd = {'value': 'sleep 1;iso_wait_slac_matched;iso_start_v2g_session AC;iso_wait_pwr_ready;iso_draw_power_regulated 16,3;iso_wait_for_stop 10;iso_wait_v2g_session_stopped;unplug'}
elif mode == Mode.HlcDc:
cmd = {'value': 'sleep 1;iso_wait_slac_matched;iso_start_v2g_session DC;iso_wait_pwr_ready;iso_wait_for_stop 10;iso_wait_v2g_session_stopped;unplug'}

# start charging simulation
self._mod.call_command(car_sim_ff, 'execute_charging_session', {
'value': 'sleep 1;iec_wait_pwr_ready;sleep 1;draw_power_regulated 16,3;sleep 5;unplug'})
self._mod.call_command(car_sim_ff, 'execute_charging_session', cmd)

expected_events = ['TransactionStarted', 'TransactionFinished']

Expand All @@ -80,7 +95,9 @@ def test(self, timeout: float) -> bool:
expected_events.pop(0)
except queue.Empty:
return False

logging.info("Total energy import: %f Wh", self._energy_wh_import)
if self._energy_wh_import <= 0:
return False
return True


Expand All @@ -105,4 +122,50 @@ async def test_001_start_test_module(everest_core: EverestCore):
everest_core.all_modules_started_event.set()
logging.info("set all modules started event...")

assert probe.test(20)
assert probe.test(20, Mode.Basic)

@pytest.mark.everest_core_config('config-sil.yaml')
@pytest.mark.asyncio
async def test_002_start_test_module_ac_hlc(everest_core: EverestCore):
logging.info(">>>>>>>>> test_002_start_test_module_ac_hlc <<<<<<<<<")

test_connections = {
'test_control': [Requirement('ev_manager', 'main')],
'connector_1': [Requirement('connector_1', 'evse')]
}

everest_core.start(standalone_module='probe', test_connections=test_connections)
logging.info("everest-core ready, waiting for probe module")

session = RuntimeSession(str(everest_core.prefix_path), str(everest_core.everest_config_path))

probe = ProbeModule(session)

if everest_core.status_listener.wait_for_status(10, ["ALL_MODULES_STARTED"]):
everest_core.all_modules_started_event.set()
logging.info("set all modules started event...")

assert probe.test(40, Mode.HlcAc)

@pytest.mark.everest_core_config('config-sil-dc.yaml')
@pytest.mark.asyncio
async def test_003_start_test_module_dc(everest_core: EverestCore):
logging.info(">>>>>>>>> test_003_start_test_module_dc <<<<<<<<<")

test_connections = {
'test_control': [Requirement('ev_manager', 'main')],
'connector_1': [Requirement('evse_manager', 'evse')]
}

everest_core.start(standalone_module='probe', test_connections=test_connections)
logging.info("everest-core ready, waiting for probe module")

session = RuntimeSession(str(everest_core.prefix_path), str(everest_core.everest_config_path))

probe = ProbeModule(session)

if everest_core.status_listener.wait_for_status(10, ["ALL_MODULES_STARTED"]):
everest_core.all_modules_started_event.set()
logging.info("set all modules started event...")

assert probe.test(40, Mode.HlcDc)
Loading