Skip to content

Commit

Permalink
Review (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
romancardenas committed Apr 27, 2023
1 parent e4b0f44 commit 7dd3ca5
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 69 deletions.
12 changes: 2 additions & 10 deletions xdevs/plugins/input_handlers/csv_input_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import csv
import sys
import time
from typing import Callable, Any
from xdevs.rt_sim.input_handler import InputHandler


Expand Down Expand Up @@ -49,12 +48,5 @@ def run(self):
if not port:
print(f'LINE {i + 1}: port ID is empty. Row will be ignored', file=sys.stderr)
continue
# 4. parse message
try:
# if parser is not defined, we forward the message as is (i.e., in string format)
msg = self.msg_parsers.get(port, lambda x: x)(msg)
except Exception:
print(f'LINE {i + 1}: error parsing msg ("{msg}"). Row will be ignored', file=sys.stderr)
continue
# 5. inject event to queue
self.push_to_queue(port, msg)
# 4. inject event to queue
self.push_msg(port, msg)
47 changes: 20 additions & 27 deletions xdevs/plugins/input_handlers/tcp_input_handler.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,26 @@
from __future__ import annotations
import queue
import socket
import threading

from xdevs.rt_sim.input_handler import InputHandler

# Server in which the ingoing event will be joining the system

def tcp_format(msg):
"""Default function for converting incoming msg to events. Incoming msg must be Port,msg."""
return msg.decode().split(',')


def client_handler(client_socket, addr, parser_f, q):
def client_handler(client_socket, addr, q): # TODO no parsea eventos, eso lo hace el input manager
"""Function to handle each client connection."""

print(f'Connected to: {addr}')
print(f'Connected to client {addr}')
while True:
data = client_socket.recv(1024)
# No existe valor por defecto. Es obligatorio pasarle un valor.
# 1024 por ser potencia de 2 y tener espacio de sobra.
if not data:
print(f'Connection closed with {addr}')
print(f'Connection with client {addr} closed')
break
data = parser_f(data)
# print(f'data to inyect in q is : {data}')
q.put(data)
# q.put((parser_f(data)))


class TCPInputHandler(InputHandler):
def __init__(self, **kwargs):
"""
Expand All @@ -48,47 +42,46 @@ def __init__(self, **kwargs):
ingoing event and the second one what is going to be injected in that port.
"""
super().__init__(**kwargs)
if self.event_parser is None:

This comment has been minimized.

Copy link
@romancardenas

romancardenas Apr 28, 2023

Author Collaborator

ANTES DEL SUPER

if 'event_parser' not in kwargs:
kwargs['event_parser'] = lambda ....

kwargs['event_parser'] = kwargs.get('event_parser', lambda.....)

self.event_parser = lambda x: x.decode().split(',')

self.host = kwargs.get('host', '0.0.0.0') # 0.0.0.0 -> listening to all interfaces. If the server is in the
# same device use LocalHost
self.host: str = kwargs.get('host', '0.0.0.0') # 0.0.0.0 -> listening to all interfaces. If the server is in the
# same device use LocalHost TODO yo lo dejaría en localhost por defecto (igual que el output handler)

self.port = kwargs.get('port')
self.port: int = kwargs.get('port')
if self.port is None:
raise ValueError('PORT is mandatory')

if self.event_parser is None:
self.event_parser = tcp_format

raise ValueError('TCP port is mandatory')

self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clients_connected = list()

self.clients_connected: list[threading.Thread] = list()

# A list to handle all the clients msgs
self.msg_q = queue.SimpleQueue()

def initialize(self):
# TODO hay que revisar este handler. La idea es un poco lo contrario a lo que haces
# TODO en run, debemos encargarnos de meter mensajes de una cola a otra
# TODO Las demás hebras auxiliares harán el truco de las conexiones TCP.
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
print('Server listening...')
t = threading.Thread(target=self.queue_handler, daemon=True)
t.start()

def run(self):
# TODO aqui debería ejecutarse el código de queue handler (ese método no sería necesario).
# TODO Esto de las conexiones sería lo de initialize
while True:
client_socket, address = self.server_socket.accept()
self.clients_connected.append(threading.Thread(target=client_handler, daemon=True,
args=(client_socket, address, self.event_parser, self.msg_q)))
args=(client_socket, address, self.msg_q)))
self.clients_connected[len(self.clients_connected)-1].start()

def queue_handler(self):
"""Messages from each client are pushed to the queue."""
while True:
port, msg = self.msg_q.get()
# msg is parser into the port type.
msg = self.msg_parsers.get(port, lambda x: x)(msg)
# print(f'MENSAJE RECIBIDO E INYECTADO: {port, msg}')
self.push_to_queue(port, msg)
event = self.msg_q.get()
self.push_event(event)


if __name__ == '__main__':
Expand Down
35 changes: 15 additions & 20 deletions xdevs/plugins/output_handlers/tcp_output_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import socket
import time
import threading
from typing import Any, Callable

from xdevs.rt_sim.output_handler import OutputHandler

Expand Down Expand Up @@ -31,46 +32,39 @@ def __init__(self, **kwargs):
:param int port: is the port in which the host is listening.
:param float t_wait: is the time (in s) for trying to reconnect to the server if a ConnectionRefusedError
exception occurs. Default is 10 s.
:param Callable[[Any, Any], str] event_parser: A function that determines the format of outgoing events. By
:param Callable[[str, Any], str] event_parser: A function that determines the format of outgoing events. By
default, the format is 'port,msg', where 'port' is the name of the port in which an event occurred, and
'msg' is the message given by the port.
"""
super().__init__()
super().__init__(**kwargs)

self.host = kwargs.get('host', 'LocalHost')

self.port = kwargs.get('port')
self.host: str = kwargs.get('host', 'LocalHost')
self.port: int = kwargs.get('port')
if self.port is None:
raise ValueError('port is mandatory')

self.t_wait = kwargs.get('t_wait', 10)
raise ValueError('TCP port is mandatory')
self.t_wait: float = kwargs.get('t_wait', 10)

self.event_parser = kwargs.get('event_parser', tcp_default_format)
self.event_parser: Callable[[str, Any], str] = kwargs.get('event_parser', lambda port, msg: f'{port},{msg}')

self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

self.is_connected = False
self.is_connected: bool = False

def exit(self):
print(f'CLosing client to server {self.host} in port {self.port}...')
print(f'Closing client to server {self.host} in port {self.port}...')
self.client_socket.close()
self.is_connected = False

def run(self):
while True:
# First we check for outgoing events
port, msg = self.queue.get()
# Wait for an outgoing event
event = self.pop_event()
try:
if self.is_connected:

# If an outgoing event occurs it is sent to the server ,but first it is formatted.
data = self.event_parser(port, msg)

if self.client_socket.fileno() > 0:
# We can only send data if the client_socket is not close. Client_socket is closed when
# .fileno() return 0
self.client_socket.sendall(data.encode())
self.client_socket.sendall(event.encode())
else:
try:

Expand All @@ -84,7 +78,8 @@ def run(self):
# This exception can be raised when: the port is blocked or closed by a firewall, host is not
# available or close, among others.
print(f'Connection refused, trying again in {self.t_wait} s. ')
time.sleep(self.t_wait)
time.sleep(self.t_wait) # TODO no sleep, guarda en una variable el tiempo de cooldown
# TODO Si no, se nos acumulan mensajes en la cola!

except OSError as e:
# If a system error occurred when connecting, we assume that the server has been shut down.
Expand Down
34 changes: 24 additions & 10 deletions xdevs/rt_sim/input_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import ClassVar, Type, Callable, Any
import sys
import pkg_resources


Expand All @@ -9,23 +11,17 @@ def __init__(self, **kwargs):
Handler interface for injecting external events to the system.
:param queue: used to collect and inject all external events joining the system.
:param event_parser: # TODO
:param Callable[[Any], tuple[str, str]] event_parser: event parser function. It transforms incoming events
into tuples (port, message). Note that both are represented as strings. Messages need further parsing.
:param dict[str, Callable[[str], Any]] msg_parsers: message parsers. Keys are port names, and values are
functions that take a string and returns an object of the corresponding port type. If a parser is not
defined, the input handler assumes that the port type is str and forward the message as is. By default, all
the ports are assumed to accept str objects.
"""
self.queue = kwargs.get('queue')
if self.queue is None:
raise ValueError('queue is mandatory')

# event_parser que va a ser para el tcp-syst una unica funcion.

# Ver si lo pongo en la padre o especifico para cada implementación. Puede necesitarsse una función por
# implementacion, pero se llamarán igual "event_parser".
self.event_parser = kwargs.get('event_parser', None)

self.event_parser: Callable[[Any], tuple[str, str]] | None = kwargs.get('event_parser')
self.msg_parsers: dict[str, Callable[[str], Any]] = kwargs.get('msg_parsers', dict())

def initialize(self):
Expand All @@ -41,10 +37,28 @@ def run(self):
"""Execution of the input handler. It is implementation-specific"""
pass

def push_to_queue(self, port, msg):
def push_event(self, event: Any):
"""Parses event as tuple port message and pushes msg to the queue."""
try:
port, msg = self.event_parser(event)
except Exception:
# if an exception is triggered while parsing the event, we ignore it
print(f'error parsing input event ("{event}"). Event will be ignored', file=sys.stderr)
return
self.push_msg(port, msg)

def push_msg(self, port: str, msg: str):
"""Adding the port and message to the queue."""
try:
# if parser is not defined, we forward the message as is (i.e., in string format)
msg = self.msg_parsers.get(port, lambda x: x)(msg)
except Exception:
# if an exception is triggered while parsing the message, we ignore it
print(f'error parsing input msg ("{msg}"). Message will be ignored', file=sys.stderr)
return
self.queue.put((port, msg))


class InputHandlers:
_plugins: ClassVar[dict[str, Type[InputHandler]]] = {
ep.name: ep.load() for ep in pkg_resources.iter_entry_points('xdevs.plugins.input_handlers')
Expand Down
31 changes: 29 additions & 2 deletions xdevs/rt_sim/output_handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from __future__ import annotations
import queue
from abc import ABC, abstractmethod
from typing import ClassVar, Type
import sys
from typing import Any, Callable, ClassVar, Type

import pkg_resources


class OutputHandler(ABC):
def __init__(self, **kwargs):
"""Handler interface for ejecting internal events from the system."""
"""
Handler interface for ejecting internal events from the system.
TODO documentation
"""
self.queue = queue.SimpleQueue()
self.event_parser: Callable[[str, str], Any] | None = kwargs.get('event_parser')
self.msg_parsers: dict[str, Callable[[Any], str]] = kwargs.get('msg_parsers', dict())

def initialize(self):
"""Performs any task before calling the run method. It is implementation-specific. By default, it is empty."""
Expand All @@ -23,6 +31,25 @@ def run(self):
"""Execution of the output handler. It is implementation-specific"""
pass

def pop_event(self) -> Any:
while True:
port, msg = self.pop_msg()
try:
event = self.event_parser(port, msg)
except Exception:
print(f'error parsing output event ("{port}","{msg}"). Event will be ignored', file=sys.stderr)
continue
return event

def pop_msg(self) -> tuple[str, str]:
while True:
port, msg = self.queue.get()
try:
msg = self.msg_parsers.get(port, lambda x: str(x))(msg)
except Exception:
print(f'error parsing output msg ("{msg}"). Message will be ignored', file=sys.stderr)
continue
return port, msg

class OutputHandlers:
_plugins: ClassVar[dict[str, Type[OutputHandler]]] = {
Expand Down

0 comments on commit 7dd3ca5

Please sign in to comment.