Skip to content

Commit

Permalink
tests on event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
XdoctorwhoZ committed Nov 21, 2023
1 parent a1ef14f commit 9f126a3
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 101 deletions.
30 changes: 28 additions & 2 deletions platform/panduza_platform/core/monitored_event_loop.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import threading
import sys

# ---

Expand Down Expand Up @@ -26,8 +28,32 @@ def __init__(self, log, *args, **kwargs):
def run_forever(self):
self.ref_time = self.time()
try:
# self.log.info(f"EVENT LOOP RUN")
super().run_forever()
self.log.info(f"EVENT LOOP RUN")
self._check_closed()
self._check_running()
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()

old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
asyncio.events._set_running_loop(self)

while True:
try:
self._run_once()
if self._stopping:
break
except KeyboardInterrupt:
self.log.warning("ctrl+c => user stop requested !!!!!!!!!!!!")
self._stopping = True
finally:
self._stopping = False
self._thread_id = None
asyncio.events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)
finally:
finished = self.time()
# self._total_time = finished - started
Expand Down
2 changes: 2 additions & 0 deletions platform/panduza_platform/core/mqtt_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ async def publish_json(self, topic, req: dict, qos=0, retain=False):
# WORKER FUNCTIONS

def stop(self):
self.log.warning("stop !")
self.mqtt_client.disconnect()
self.misc.cancel()
super().stop()

# ---
Expand Down
203 changes: 121 additions & 82 deletions platform/panduza_platform/core/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from sys import platform

from core.monitored_event_loop import MonitoredEventLoop
from core.platform_event_loop import PlatformEventLoop

from .conf import PLATFORM_VERSION
from log.platform import platform_logger
Expand Down Expand Up @@ -138,8 +138,16 @@ async def mount_client(self, name, host_addr, host_port):

# ---

def unmount_client(self):
pass
def unmount_client(self, client):
client.stop()
self.devices.remove(client)

# ---

async def unmount_all_clients(self):
for client in self.clients:
# if not client.keep_mounted:
await self.unmount_client(client)

# ---

Expand All @@ -160,19 +168,31 @@ async def mount_device(self, client_name, group_name, device_cfg, keep_mounted =

# ---

async def unmount_device(self, device):
def unmount_device(self, device):
device.unmount_interfaces()
self.devices.remove(device)

# ---

async def unmount_device_async(self, device):
await device.unmount_interfaces()
self.devices.remove(device)

# ---

async def unmount_all_devices(self):
def unmount_all_devices(self):
for device in self.devices:
if not device.keep_mounted:
await self.unmount_device(device)
self.unmount_device(device)

# ---

async def unmount_all_devices_async(self):
for device in self.devices:
if not device.keep_mounted:
await self.unmount_device(device)
# ---

def get_number_of_device(self):
"""Number fo devices mounted on this platform
"""
Expand All @@ -197,7 +217,7 @@ async def load_config_content_task(self, new_dtree):
self.log.debug(f"load config:{json.dumps(new_dtree, indent=1)}")

# Remove mounted device from the previous configuration
await self.unmount_all_devices()
await self.unmount_all_devices_async()

# Mount each device
devices = new_dtree.get("devices", [])
Expand All @@ -218,7 +238,7 @@ async def handle_worker_panic(self, name, status):
self.log.error( str(status.get("error_string", "")) + "\n" )

# Remove mounted device from the previous configuration
await self.unmount_all_devices()
await self.unmount_all_devices_async()

# ---

Expand Down Expand Up @@ -255,37 +275,63 @@ async def __idle_task(self):
while(self.alive):
await asyncio.sleep(1)

self.log.info("end of idle task")

# ---

async def __dying_gasp_task(self):
"""Perform the dying gasp procedure
"""
#
self.log.warning(f"Dying Gasp !")

# Start the global task group
async with asyncio.TaskGroup() as self.task_group:

# Connect to primary broker
await self.mount_client("primary", "localhost", 1883)

# Mount the device interfaces of the server
await self.mount_device("primary", "server",
{
"name": socket.gethostname(),
"ref": "Panduza.Server",
"settings": {
"dying_gasp": True
}
}
)


# ---

def __oper_mode(self):
"""Run the operational mode
This mode start the main event loop then the initialisation task
"""
try:
# Manage the status file (file to indicate the admin interface logs of the crash)
if os.path.isfile(STATUS_FILE_PATH):
os.remove(STATUS_FILE_PATH)

# Start the loop and monitor activity
# If the debug flag is enabled, start monitored event loop
if self.event_loop_debug:
self.event_loop = MonitoredEventLoop(self.log)
asyncio.set_event_loop(self.event_loop)
with aiomonitor.start_monitor(self.event_loop):
self.event_loop.run_until_complete(self.__idle_task())
else:
self.event_loop = asyncio.get_event_loop()
# try:

# Start the loop and monitor activity
# If the debug flag is enabled, start monitored event loop
if self.event_loop_debug:
self.event_loop = PlatformEventLoop(self)
asyncio.set_event_loop(self.event_loop)
with aiomonitor.start_monitor(self.event_loop):
self.event_loop.run_until_complete(self.__idle_task())
# If release do not enable debug features
else:
self.event_loop = asyncio.get_event_loop()
self.event_loop.run_until_complete(self.__idle_task())

except InitializationError as e:
self.log.critical(f"Error during platform initialization: {e}")
self.generate_early_status_report(str(e))
except KeyboardInterrupt:
self.log.warning("ctrl+c => user stop requested")
self.__stop()
except FileNotFoundError:
self.log.critical(f"Platform configuration file 'tree.json' has not been found at location '{self.dtree_filepath}' !!==>> STOP PLATFORM")
# except InitializationError as e:
# self.log.critical(f"Error during platform initialization: {e}")
# self.generate_early_status_report(str(e))
# except KeyboardInterrupt:
# self.log.warning("ctrl+c => user stop requested")
# self.stop()
# except FileNotFoundError:
# self.log.critical(f"Platform configuration file 'tree.json' has not been found at location '{self.dtree_filepath}' !!==>> STOP PLATFORM")

# ---

Expand Down Expand Up @@ -315,74 +361,67 @@ async def __load_tree_task(self):

# ---

def __stop(self):
"""To stop the entire platform
def stop(self):
"""To stop the entire platform, just perform dying gasp
"""
# self.log.warning("")

# Stop alive flag
self.alive = False

#
self.log.warning("Platform stopping...")
for thr in self.threads:
thr.stop()
self.unmount_all_devices()
# self.unmount_all_clients()

#
for thr in self.threads:
thr.join()

# Generate status reports
self.generate_status_reports()
# # ---

# def generate_early_status_report(self, error_string):
# """Generate a status report when something went wrong during initialization
# """
# status_obj = {}

# status_obj["final_state"] = "initialization"
# status_obj["error_string"] = error_string
# status_obj["threads"] = []

# ---
# # Write the status file
# with open(STATUS_FILE_PATH, "w") as json_file:
# json.dump(status_obj, json_file)

def generate_early_status_report(self, error_string):
"""Generate a status report when something went wrong during initialization
"""
status_obj = {}
# # ---

status_obj["final_state"] = "initialization"
status_obj["error_string"] = error_string
status_obj["threads"] = []
# def generate_status_reports(self):
# """Generate a json report status and log it to the console
# """

# Write the status file
with open(STATUS_FILE_PATH, "w") as json_file:
json.dump(status_obj, json_file)
# status_obj = {}
# status_obj["final_state"] = "running"

# ---
# # Gather the status of each thread
# thread_status = []
# for thr in self.threads:
# thread_status.append(thr.get_status())

def generate_status_reports(self):
"""Generate a json report status and log it to the console
"""
# #
# status_obj["threads"] = thread_status

status_obj = {}
status_obj["final_state"] = "running"
# # Write the status file
# with open(STATUS_FILE_PATH, "w") as json_file:
# json.dump(status_obj, json_file)

# Gather the status of each thread
thread_status = []
for thr in self.threads:
thread_status.append(thr.get_status())
# # Print into the console
# report = "\n"
# for thr in thread_status:
# report += "=================================\n"
# report +=f"== {thr['name']} \n"
# report += "=================================\n"

#
status_obj["threads"] = thread_status

# Write the status file
with open(STATUS_FILE_PATH, "w") as json_file:
json.dump(status_obj, json_file)

# Print into the console
report = "\n"
for thr in thread_status:
report += "=================================\n"
report +=f"== {thr['name']} \n"
report += "=================================\n"

for w in thr['workers']:
report += "\n"
report += str(w.get("name", "")) + "\n"
report += str(w.get("final_state", "")) + "\n"
report += str(w.get("error_string", "")) + "\n"
self.log.info(report)
# for w in thr['workers']:
# report += "\n"
# report += str(w.get("name", "")) + "\n"
# report += str(w.get("final_state", "")) + "\n"
# report += str(w.get("error_string", "")) + "\n"
# self.log.info(report)



Expand Down
9 changes: 8 additions & 1 deletion platform/panduza_platform/core/platform_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@ async def mount_interfaces(self):

# ---

async def unmount_interfaces(self):
def unmount_interfaces(self):
"""
"""
for itf in self.interfaces:
itf.unmount()

# ---

async def unmount_interfaces_async(self):
"""
"""
self.unmount_interfaces()

# ---

def mount_interface(self, itf):
"""Mount an interface
"""
Expand Down
Loading

0 comments on commit 9f126a3

Please sign in to comment.