Skip to content

Commit

Permalink
continue thread removal and event loop core improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodriguez committed Oct 22, 2023
1 parent ca78446 commit 908f3ae
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 59 deletions.
6 changes: 3 additions & 3 deletions platform/panduza_platform/core/monitored_event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ def __init__(self, log, *args, **kwargs):
self.log = log
self.perf_cycle_time = 2

self.log.info(f"EVENT LOOP UP !!")
# self.log.info(f"EVENT LOOP UP !!")

# ---

# TOTAL TIME:
def run_forever(self):
self.ref_time = self.time()
try:
self.log.info(f"EVENT LOOP RUN")
# self.log.info(f"EVENT LOOP RUN")
super().run_forever()
finally:
finished = self.time()
Expand All @@ -50,7 +50,7 @@ def _process_events(self, *args, **kwargs):
super()._process_events(*args, **kwargs)

cycle_time = self.time() - self.ref_time
self.log.info(f"EVENT {cycle_time}")
# self.log.info(f"EVENT {cycle_time}")
if cycle_time >= self.perf_cycle_time:

work_time = cycle_time - self._select_time
Expand Down
40 changes: 24 additions & 16 deletions platform/panduza_platform/core/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ def run(self):

# ---

async def load_worker(self, worker):
def load_worker(self, worker):
"""Load worker into the event loop
"""
await self.event_loop.create_task( worker.task(), name=worker.name() )
self.log.info(f"Load '{worker}'")
self.task_group.create_task( worker.task(), name=worker.PZA_WORKER_name() )

# ---

Expand All @@ -128,9 +129,11 @@ async def load_worker(self, worker):
async def mount_client(self, name, host_addr, host_port):
"""Mount a mqtt client
"""
self.log.info(f"Mount client '{name}'")
mqtt_client = MqttAsyncClient(host_addr, host_port)
self.clients[name] = mqtt_client
await self.load_worker(mqtt_client)
# await
self.load_worker(mqtt_client)

# ---

Expand All @@ -143,7 +146,7 @@ async def mount_device(self, client_name, group_name, device_cfg):
"""Mount a device
"""
# Debug log
self.log.info(f"Mount device {device_cfg}")
self.log.info(f"Mount device '{device_cfg}'")

# Produce the device instance
device_instance = self.device_factory.produce_device(self.clients[client_name], group_name, device_cfg)
Expand Down Expand Up @@ -294,19 +297,24 @@ async def __idle_task(self):
# Start idle task
self.log.info(f"Idle task start")

# Connect to primary broker
await self.mount_client("primary", "localhost", 1883)

# Mount the device interfaces of the server
await self.mount_device("primary", "server", {
"name": socket.gethostname(),
"ref": "Panduza.Server",
}
)
#
async with asyncio.TaskGroup() as self.task_group:

# Wait for ever
while(self.alive):
await asyncio.sleep(1)
# Connect to primary broker
await self.mount_client("primary", "localhost", 1883)

self.log.info(f"!!!!!!!!!!!!")

# Mount the device interfaces of the server
await self.mount_device("primary", "server", {
"name": socket.gethostname(),
"ref": "Panduza.Server",
}
)

# Wait for ever
while(self.alive):
await asyncio.sleep(1)

# ---

Expand Down
16 changes: 8 additions & 8 deletions platform/panduza_platform/core/platform_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ def __init__(self, platform=None, client=None, group_name=None, name=None, setti
self.log = device_logger("device_" + str(self.__name))

# Settings json provided by the user with the tree.json
self.__platform = platform
self.platform = platform


self.__client = client
self.__group_name = group_name
self.group_name = group_name

# Settings json provided by the user with the tree.json
self.__settings = settings
self.settings = settings

# Interfaces linked to this device
self.__interfaces = []
Expand All @@ -46,11 +46,11 @@ async def mount_interfaces(self):

# ---

async def mount_interface(self, itf):
def mount_interface(self, itf):
"""
"""
itf.attach_client(self.__client)
await self.__platform.load_worker(itf)
self.platform.load_worker(itf)

# ---

Expand All @@ -67,7 +67,7 @@ def number_of_interfaces(self):
def get_settings(self):
"""Return settings provided by the user for this device
"""
return self.__settings
return self.settings

# ---

Expand Down Expand Up @@ -159,8 +159,8 @@ def _PZA_DEV_unique_name_generator(self):
By default this function does not support multiple instance of the same device on the smae bench.
Because with this simple method, they will have the same name.
"""
# name = self.__settings.get("name", None)
# print("!!!!!!!!!!!", self.__settings)
# name = self.settings.get("name", None)
# print("!!!!!!!!!!!", self.settings)
if self.__name:
return self.__name
else:
Expand Down
6 changes: 3 additions & 3 deletions platform/panduza_platform/core/platform_device_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def __init__(self, parent_platform):
# The factory is composed of builders
self.__device_templates = {}

self.__platform = parent_platform
self.__log = self.__platform.log
self.platform = parent_platform
self.__log = self.platform.log

# ---

Expand All @@ -36,7 +36,7 @@ def produce_device(self, client, group_name, config):

# Produce the device
try:
producted_device = self.__device_templates[ref](platform=self.__platform, client=client, group_name=group_name, name=name, settings=config.get("settings", {}))
producted_device = self.__device_templates[ref](platform=self.platform, client=client, group_name=group_name, name=name, settings=config.get("settings", {}))
producted_device.initialize()
return producted_device
except Exception as e:
Expand Down
32 changes: 19 additions & 13 deletions platform/panduza_platform/core/platform_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ def initialize(self):
# raise Exception("Name of the interface is required !")

# Get name
print(self.tree)
self.name = self.tree["name"]
# self.name = "pok"

# Init logger
self.device_name = "TMP"
self.worker_name = f"{self.bench_name}/{self.device_name}/{self.name}"
self.log = driver_logger(self.worker_name)

Expand Down Expand Up @@ -207,6 +210,8 @@ async def PZA_WORKER_task(self):
"""
"""

self.log.info(f"pokkkk task {self.__drv_state} {self.__err_string}")

try:

# Process Scan Events
Expand All @@ -221,7 +226,7 @@ async def PZA_WORKER_task(self):

# Managed message
self._state_started_time = time.time()
self.log.debug(f"STATE CHANGE ::: {self.__drv_state_prev} => {self.__drv_state}")
self.log.info(f"STATE CHANGE ::: {self.__drv_state_prev} => {self.__drv_state}")
self.__drv_state_prev = self.__drv_state

# Manage the errstring
Expand Down Expand Up @@ -257,31 +262,32 @@ async def alive_task(self):

# ---

async def __drv_state_init(self, loop):
async def __drv_state_init(self):
"""
"""
loop.create_task(self.alive_task())
# self.platform.load
# loop.create_task(self.alive_task())
self.__subscribe_topics()
await self._PZA_DRV_loop_init(loop, self.tree)
await self._PZA_DRV_loop_init()

# ---

async def __drv_state_run(self, loop):
async def __drv_state_run(self):
"""
"""
try:
await self._PZA_DRV_loop_run(loop)
await self._PZA_DRV_loop_run()
except Exception as e:
self._PZA_DRV_error_detected(str(e) + " " + traceback.format_exc())

# ---

async def __drv_state_err(self, loop):
async def __drv_state_err(self):
"""
"""
try:
self.worker_panic()
await self._PZA_DRV_loop_err(loop)
await self._PZA_DRV_loop_err()
except Exception as e:
self.log.error(str(e))

Expand Down Expand Up @@ -522,17 +528,17 @@ def _PZA_DRV_hunt_instances(self):
"""
return []

async def _PZA_DRV_loop_init(self, loop, tree):
async def _PZA_DRV_loop_init(self):
"""
"""
self._PZA_DRV_init_success()

async def _PZA_DRV_loop_run(self, loop):
async def _PZA_DRV_loop_run(self):
"""
"""
await asyncio.sleep(0.1)

async def _PZA_DRV_loop_err(self, loop):
async def _PZA_DRV_loop_err(self):
"""
"""
elasped = time.time() - self._state_started_time
Expand All @@ -542,7 +548,7 @@ async def _PZA_DRV_loop_err(self, loop):
await asyncio.sleep(1)
self.log.debug(f"restart in { int(PlatformDriver.ERROR_TIME_BEFORE_RETRY_S - elasped) }s")

async def _PZA_DRV_cmds_set(self, loop, payload):
async def _PZA_DRV_cmds_set(self, payload):
"""Must apply the command on the driver
"""
pass
Expand Down Expand Up @@ -592,7 +598,7 @@ async def _PZA_DRV_cmds_set(self, loop, payload):

# ---

async def __process_scan_events(self, loop):
async def __process_scan_events(self):
"""Process event following the scan specification
- 1) When '*' is published inside the topic 'pza' then info attribute must be published.
Expand Down
6 changes: 3 additions & 3 deletions platform/panduza_platform/core/platform_driver_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ def __init__(self, parent_platform):
""" Constructor
"""
self.__drivers = {}
self.__platform = parent_platform
self.__log = self.__platform.log
self.platform = parent_platform
self.__log = self.platform.log

# ---

Expand All @@ -34,7 +34,7 @@ def produce_interface(self, bench_name, device, interface_config):
instance = driver_obj()

# Link the instance with its context
instance.set_platform(self.__platform)
instance.set_platform(self.platform)
instance.set_bench_name(bench_name)
instance.set_device(device)
instance.set_device_name(device.get_name())
Expand Down
2 changes: 1 addition & 1 deletion platform/panduza_platform/core/platform_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
import threading

from log.thread import thread_logger
from platform.panduza_platform.log.device import thread_logger



Expand Down
5 changes: 0 additions & 5 deletions platform/panduza_platform/core/platform_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ def reset_work_time(self):

# ---

def name(self):
return self.PZA_WORKER_name()

# ---

def stop(self):
self.__alive = False

Expand Down
13 changes: 8 additions & 5 deletions platform/panduza_platform/devices/panduza/server/itf_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ def _PZA_DRV_config(self):
async def _PZA_DRV_loop_init(self):
"""From PlatformDriver
"""

self.log.info("!!! inin interface ???????????????????????????")

# Update the number of managed interface
await self._update_attribute("info", "interfaces", self.platform.get_interface_number())

await self._update_attributes_from_dict({
"info": {
"number_of_devices": self.platform.get_number_of_device(),
}
})
# await self._update_attributes_from_dict({
# "info": {
# "number_of_devices": self.platform.get_number_of_device(),
# }
# })

# Tell the platform that the init state end sucessfuly
self._PZA_DRV_init_success()
Expand Down
24 changes: 22 additions & 2 deletions platform/panduza_platform/devices/panduza/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from .itf_platform import InterfacePanduzaPlatform

import asyncio

class DevicePanduzaServer(PlatformDevice):
"""Represent the machine on which the platform is running
"""
Expand All @@ -23,10 +25,28 @@ async def _PZA_DEV_mount_interfaces(self):
"""
"""

self.log.info("mount !!!!!!!!!!!!!!!!!!!")
await asyncio.sleep(2)

self.log.info("mount !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

itf_platform = InterfacePanduzaPlatform()
await self.mount_interface(itf_platform)


itf_platform.set_platform(self.platform)
itf_platform.set_bench_name(self.group_name)
# itf_platform.set_device(device)
# itf_platform.set_device_name(device.get_name())
itf_platform.set_tree({
"name": "test"
})


itf_platform.initialize()

self.log.info(f"mount !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! {self.settings}")


self.mount_interface(itf_platform)

# itf_platform.attach_client()
# load_worker()
Expand Down

0 comments on commit 908f3ae

Please sign in to comment.