diff --git a/xdevs/plugins/input_handlers/csv_input_handler.py b/xdevs/plugins/input_handlers/csv_input_handler.py index ebaa4b7..2d8b5b9 100644 --- a/xdevs/plugins/input_handlers/csv_input_handler.py +++ b/xdevs/plugins/input_handlers/csv_input_handler.py @@ -1,7 +1,6 @@ import csv import sys import time -from typing import Callable, Any from xdevs.rt_sim.input_handler import InputHandler @@ -49,12 +48,5 @@ def run(self): if not port: print(f'LINE {i + 1}: port ID is empty. Row will be ignored', file=sys.stderr) continue - # 4. parse message - try: - # if parser is not defined, we forward the message as is (i.e., in string format) - msg = self.msg_parsers.get(port, lambda x: x)(msg) - except Exception: - print(f'LINE {i + 1}: error parsing msg ("{msg}"). Row will be ignored', file=sys.stderr) - continue - # 5. inject event to queue - self.push_to_queue(port, msg) + # 4. inject event to queue + self.push_msg(port, msg) diff --git a/xdevs/plugins/input_handlers/tcp_input_handler.py b/xdevs/plugins/input_handlers/tcp_input_handler.py index 8011215..27aaed2 100644 --- a/xdevs/plugins/input_handlers/tcp_input_handler.py +++ b/xdevs/plugins/input_handlers/tcp_input_handler.py @@ -1,32 +1,26 @@ +from __future__ import annotations import queue import socket import threading - from xdevs.rt_sim.input_handler import InputHandler -# Server in which the ingoing event will be joining the system - -def tcp_format(msg): - """Default function for converting incoming msg to events. Incoming msg must be Port,msg.""" - return msg.decode().split(',') - -def client_handler(client_socket, addr, parser_f, q): +def client_handler(client_socket, addr, q): # TODO no parsea eventos, eso lo hace el input manager """Function to handle each client connection.""" - print(f'Connected to: {addr}') + print(f'Connected to client {addr}') while True: data = client_socket.recv(1024) # No existe valor por defecto. Es obligatorio pasarle un valor. # 1024 por ser potencia de 2 y tener espacio de sobra. if not data: - print(f'Connection closed with {addr}') + print(f'Connection with client {addr} closed') break - data = parser_f(data) # print(f'data to inyect in q is : {data}') q.put(data) # q.put((parser_f(data))) + class TCPInputHandler(InputHandler): def __init__(self, **kwargs): """ @@ -48,26 +42,26 @@ def __init__(self, **kwargs): ingoing event and the second one what is going to be injected in that port. """ super().__init__(**kwargs) + if self.event_parser is None: + self.event_parser = lambda x: x.decode().split(',') - self.host = kwargs.get('host', '0.0.0.0') # 0.0.0.0 -> listening to all interfaces. If the server is in the - # same device use LocalHost + self.host: str = kwargs.get('host', '0.0.0.0') # 0.0.0.0 -> listening to all interfaces. If the server is in the + # same device use LocalHost TODO yo lo dejaría en localhost por defecto (igual que el output handler) - self.port = kwargs.get('port') + self.port: int = kwargs.get('port') if self.port is None: - raise ValueError('PORT is mandatory') - - if self.event_parser is None: - self.event_parser = tcp_format - + raise ValueError('TCP port is mandatory') self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.clients_connected = list() - + self.clients_connected: list[threading.Thread] = list() # A list to handle all the clients msgs self.msg_q = queue.SimpleQueue() def initialize(self): + # TODO hay que revisar este handler. La idea es un poco lo contrario a lo que haces + # TODO en run, debemos encargarnos de meter mensajes de una cola a otra + # TODO Las demás hebras auxiliares harán el truco de las conexiones TCP. self.server_socket.bind((self.host, self.port)) self.server_socket.listen() print('Server listening...') @@ -75,20 +69,19 @@ def initialize(self): t.start() def run(self): + # TODO aqui debería ejecutarse el código de queue handler (ese método no sería necesario). + # TODO Esto de las conexiones sería lo de initialize while True: client_socket, address = self.server_socket.accept() self.clients_connected.append(threading.Thread(target=client_handler, daemon=True, - args=(client_socket, address, self.event_parser, self.msg_q))) + args=(client_socket, address, self.msg_q))) self.clients_connected[len(self.clients_connected)-1].start() def queue_handler(self): """Messages from each client are pushed to the queue.""" while True: - port, msg = self.msg_q.get() - # msg is parser into the port type. - msg = self.msg_parsers.get(port, lambda x: x)(msg) - # print(f'MENSAJE RECIBIDO E INYECTADO: {port, msg}') - self.push_to_queue(port, msg) + event = self.msg_q.get() + self.push_event(event) if __name__ == '__main__': diff --git a/xdevs/plugins/output_handlers/tcp_output_handler.py b/xdevs/plugins/output_handlers/tcp_output_handler.py index 05e2955..eee627c 100644 --- a/xdevs/plugins/output_handlers/tcp_output_handler.py +++ b/xdevs/plugins/output_handlers/tcp_output_handler.py @@ -1,6 +1,7 @@ import socket import time import threading +from typing import Any, Callable from xdevs.rt_sim.output_handler import OutputHandler @@ -31,46 +32,39 @@ def __init__(self, **kwargs): :param int port: is the port in which the host is listening. :param float t_wait: is the time (in s) for trying to reconnect to the server if a ConnectionRefusedError exception occurs. Default is 10 s. - :param Callable[[Any, Any], str] event_parser: A function that determines the format of outgoing events. By + :param Callable[[str, Any], str] event_parser: A function that determines the format of outgoing events. By default, the format is 'port,msg', where 'port' is the name of the port in which an event occurred, and 'msg' is the message given by the port. """ - super().__init__() + super().__init__(**kwargs) - self.host = kwargs.get('host', 'LocalHost') - - self.port = kwargs.get('port') + self.host: str = kwargs.get('host', 'LocalHost') + self.port: int = kwargs.get('port') if self.port is None: - raise ValueError('port is mandatory') - - self.t_wait = kwargs.get('t_wait', 10) + raise ValueError('TCP port is mandatory') + self.t_wait: float = kwargs.get('t_wait', 10) - self.event_parser = kwargs.get('event_parser', tcp_default_format) + self.event_parser: Callable[[str, Any], str] = kwargs.get('event_parser', lambda port, msg: f'{port},{msg}') self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - self.is_connected = False + self.is_connected: bool = False def exit(self): - print(f'CLosing client to server {self.host} in port {self.port}...') + print(f'Closing client to server {self.host} in port {self.port}...') self.client_socket.close() self.is_connected = False def run(self): while True: - # First we check for outgoing events - port, msg = self.queue.get() + # Wait for an outgoing event + event = self.pop_event() try: if self.is_connected: - - # If an outgoing event occurs it is sent to the server ,but first it is formatted. - data = self.event_parser(port, msg) - if self.client_socket.fileno() > 0: # We can only send data if the client_socket is not close. Client_socket is closed when # .fileno() return 0 - self.client_socket.sendall(data.encode()) + self.client_socket.sendall(event.encode()) else: try: @@ -84,7 +78,8 @@ def run(self): # This exception can be raised when: the port is blocked or closed by a firewall, host is not # available or close, among others. print(f'Connection refused, trying again in {self.t_wait} s. ') - time.sleep(self.t_wait) + time.sleep(self.t_wait) # TODO no sleep, guarda en una variable el tiempo de cooldown + # TODO Si no, se nos acumulan mensajes en la cola! except OSError as e: # If a system error occurred when connecting, we assume that the server has been shut down. diff --git a/xdevs/rt_sim/input_handler.py b/xdevs/rt_sim/input_handler.py index f841dd1..b49efe3 100644 --- a/xdevs/rt_sim/input_handler.py +++ b/xdevs/rt_sim/input_handler.py @@ -1,5 +1,7 @@ +from __future__ import annotations from abc import ABC, abstractmethod from typing import ClassVar, Type, Callable, Any +import sys import pkg_resources @@ -9,23 +11,17 @@ def __init__(self, **kwargs): Handler interface for injecting external events to the system. :param queue: used to collect and inject all external events joining the system. - :param event_parser: # TODO + :param Callable[[Any], tuple[str, str]] event_parser: event parser function. It transforms incoming events + into tuples (port, message). Note that both are represented as strings. Messages need further parsing. :param dict[str, Callable[[str], Any]] msg_parsers: message parsers. Keys are port names, and values are functions that take a string and returns an object of the corresponding port type. If a parser is not defined, the input handler assumes that the port type is str and forward the message as is. By default, all the ports are assumed to accept str objects. - """ self.queue = kwargs.get('queue') if self.queue is None: raise ValueError('queue is mandatory') - - # event_parser que va a ser para el tcp-syst una unica funcion. - - # Ver si lo pongo en la padre o especifico para cada implementación. Puede necesitarsse una función por - # implementacion, pero se llamarán igual "event_parser". - self.event_parser = kwargs.get('event_parser', None) - + self.event_parser: Callable[[Any], tuple[str, str]] | None = kwargs.get('event_parser') self.msg_parsers: dict[str, Callable[[str], Any]] = kwargs.get('msg_parsers', dict()) def initialize(self): @@ -41,10 +37,28 @@ def run(self): """Execution of the input handler. It is implementation-specific""" pass - def push_to_queue(self, port, msg): + def push_event(self, event: Any): + """Parses event as tuple port message and pushes msg to the queue.""" + try: + port, msg = self.event_parser(event) + except Exception: + # if an exception is triggered while parsing the event, we ignore it + print(f'error parsing input event ("{event}"). Event will be ignored', file=sys.stderr) + return + self.push_msg(port, msg) + + def push_msg(self, port: str, msg: str): """Adding the port and message to the queue.""" + try: + # if parser is not defined, we forward the message as is (i.e., in string format) + msg = self.msg_parsers.get(port, lambda x: x)(msg) + except Exception: + # if an exception is triggered while parsing the message, we ignore it + print(f'error parsing input msg ("{msg}"). Message will be ignored', file=sys.stderr) + return self.queue.put((port, msg)) + class InputHandlers: _plugins: ClassVar[dict[str, Type[InputHandler]]] = { ep.name: ep.load() for ep in pkg_resources.iter_entry_points('xdevs.plugins.input_handlers') diff --git a/xdevs/rt_sim/output_handler.py b/xdevs/rt_sim/output_handler.py index a9ec08f..0ace2d5 100644 --- a/xdevs/rt_sim/output_handler.py +++ b/xdevs/rt_sim/output_handler.py @@ -1,14 +1,22 @@ +from __future__ import annotations import queue from abc import ABC, abstractmethod -from typing import ClassVar, Type +import sys +from typing import Any, Callable, ClassVar, Type import pkg_resources class OutputHandler(ABC): def __init__(self, **kwargs): - """Handler interface for ejecting internal events from the system.""" + """ + Handler interface for ejecting internal events from the system. + + TODO documentation + """ self.queue = queue.SimpleQueue() + self.event_parser: Callable[[str, str], Any] | None = kwargs.get('event_parser') + self.msg_parsers: dict[str, Callable[[Any], str]] = kwargs.get('msg_parsers', dict()) def initialize(self): """Performs any task before calling the run method. It is implementation-specific. By default, it is empty.""" @@ -23,6 +31,25 @@ def run(self): """Execution of the output handler. It is implementation-specific""" pass + def pop_event(self) -> Any: + while True: + port, msg = self.pop_msg() + try: + event = self.event_parser(port, msg) + except Exception: + print(f'error parsing output event ("{port}","{msg}"). Event will be ignored', file=sys.stderr) + continue + return event + + def pop_msg(self) -> tuple[str, str]: + while True: + port, msg = self.queue.get() + try: + msg = self.msg_parsers.get(port, lambda x: str(x))(msg) + except Exception: + print(f'error parsing output msg ("{msg}"). Message will be ignored', file=sys.stderr) + continue + return port, msg class OutputHandlers: _plugins: ClassVar[dict[str, Type[OutputHandler]]] = {