From 20f9695a241bbd849a268295c456a592fea8a2a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rom=C3=A1n=20C=C3=A1rdenas=20Rodr=C3=ADguez?= Date: Tue, 25 Jun 2024 15:54:08 +0200 Subject: [PATCH] Add Cell-DEVS support --- pyproject.toml | 5 + xdevs/abc/__init__.py | 1 + xdevs/abc/celldevs.py | 58 ++++ xdevs/abc/transducer.py | 4 +- xdevs/celldevs/__init__.py | 6 + xdevs/celldevs/cell.py | 192 +++++++++++++ xdevs/celldevs/coupled.py | 90 ++++++ xdevs/celldevs/grid.py | 288 ++++++++++++++++++++ xdevs/celldevs/inout.py | 52 ++++ xdevs/examples/async_rt/basic.py | 271 ------------------ xdevs/examples/async_rt/csv_output_v3.csv | 40 --- xdevs/examples/async_rt/prueba.csv | 10 - xdevs/examples/celldevs_sir/__init__.py | 0 xdevs/examples/celldevs_sir/main.py | 37 +++ xdevs/examples/celldevs_sir/scenario.json | 49 ++++ xdevs/examples/celldevs_sir/sir_cell.py | 68 +++++ xdevs/examples/celldevs_sir/sir_coupled.py | 36 +++ xdevs/examples/celldevs_sir/sir_sink.py | 53 ++++ xdevs/factory.py | 22 +- xdevs/plugins/celldevs_outputs/__init__.py | 0 xdevs/plugins/celldevs_outputs/hybrid.py | 26 ++ xdevs/plugins/celldevs_outputs/inertial.py | 22 ++ xdevs/plugins/celldevs_outputs/transport.py | 27 ++ 23 files changed, 1033 insertions(+), 324 deletions(-) create mode 100644 xdevs/abc/celldevs.py create mode 100644 xdevs/celldevs/__init__.py create mode 100644 xdevs/celldevs/cell.py create mode 100644 xdevs/celldevs/coupled.py create mode 100644 xdevs/celldevs/grid.py create mode 100644 xdevs/celldevs/inout.py delete mode 100644 xdevs/examples/async_rt/basic.py delete mode 100644 xdevs/examples/async_rt/csv_output_v3.csv delete mode 100644 xdevs/examples/async_rt/prueba.csv create mode 100644 xdevs/examples/celldevs_sir/__init__.py create mode 100644 xdevs/examples/celldevs_sir/main.py create mode 100644 xdevs/examples/celldevs_sir/scenario.json create mode 100644 xdevs/examples/celldevs_sir/sir_cell.py create mode 100644 xdevs/examples/celldevs_sir/sir_coupled.py create mode 100644 xdevs/examples/celldevs_sir/sir_sink.py create mode 100644 xdevs/plugins/celldevs_outputs/__init__.py create mode 100644 xdevs/plugins/celldevs_outputs/hybrid.py create mode 100644 xdevs/plugins/celldevs_outputs/inertial.py create mode 100644 xdevs/plugins/celldevs_outputs/transport.py diff --git a/pyproject.toml b/pyproject.toml index 0509310..2d6b8ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,11 @@ efp = "xdevs.examples.gpt.efp:Efp" [project.entry-points."xdevs.wrappers"] pypdevs = "xdevs.plugins.wrappers.pypdevs:PyPDEVSWrapper" +[project.entry-points."xdevs.celldevs_outputs"] +hybrid = "xdevs.plugins.celldevs_outputs.hybrid:HybridDelayedOutput" +inertial = "xdevs.plugins.celldevs_outputs.inertial:InertialDelayedOutput" +transport = "xdevs.plugins.celldevs_outputs.transport:TransportDelayedOutput" + [tool.setuptools] include-package-data = false diff --git a/xdevs/abc/__init__.py b/xdevs/abc/__init__.py index ce2b8cf..8882ca6 100644 --- a/xdevs/abc/__init__.py +++ b/xdevs/abc/__init__.py @@ -1,2 +1,3 @@ +from .celldevs import DelayedOutput from .handler import InputHandler, OutputHandler from .transducer import Transducer diff --git a/xdevs/abc/celldevs.py b/xdevs/abc/celldevs.py new file mode 100644 index 0000000..195b336 --- /dev/null +++ b/xdevs/abc/celldevs.py @@ -0,0 +1,58 @@ +from abc import ABC, abstractmethod +from typing import Generic +from xdevs import INFINITY +from xdevs.models import Port +from xdevs.celldevs import C, S + + +class DelayedOutput(Generic[C, S], ABC): + def __init__(self, cell_id: C, serve: bool = False): + """ + Cell-DEVS delayed output port. This is an abstract base class. + :param cell_id: ID of the cell that owns this delayed output. + :param serve: set to True if the port is going to be accessible via RPC server. Defaults to False. + """ + from xdevs.celldevs.inout import CellMessage + self.cell_id = cell_id + self.port: Port[CellMessage[C, S]] = Port(CellMessage, 'out_celldevs', serve) + + @abstractmethod + def add_to_buffer(self, when: float, state: S): + """ + Schedules a cell state to send events. + :param when: time at which the events must be sent. + :param state: cell state. Events will be obtained by mapping this state. + """ + pass + + @abstractmethod + def next_time(self) -> float: + """:return: next time at which events must be sent.""" + pass + + @abstractmethod + def next_state(self) -> S: + """:return: next cell state used to generate events.""" + pass + + @abstractmethod + def pop_state(self): + """removes schedule state from the delayed output.""" + pass + + def send_events(self, time: float): + """ + If there is an scheduled state, it sends a new event via every Cell-DEVS output port. + :param time: current simulation time. + """ + from xdevs.celldevs.inout import CellMessage + if self.next_time() <= time: + self.port.add(CellMessage(self.cell_id, self.next_state())) + + def clean(self, time: float): + """ + It cleans all the outdated scheduled cell states. + :param time: current simulation time. + """ + while self.next_time() < INFINITY and self.next_time() <= time: + self.pop_state() diff --git a/xdevs/abc/transducer.py b/xdevs/abc/transducer.py index c88d785..7154a72 100644 --- a/xdevs/abc/transducer.py +++ b/xdevs/abc/transducer.py @@ -10,9 +10,9 @@ class Transducible(ABC): - @staticmethod + @classmethod @abstractmethod - def transducer_map() -> dict[str, tuple[Type[T], Callable[[Any], T]]]: + def transducer_map(cls) -> dict[str, tuple[Type[T], Callable[[Any], T]]]: pass diff --git a/xdevs/celldevs/__init__.py b/xdevs/celldevs/__init__.py new file mode 100644 index 0000000..6d7d5bd --- /dev/null +++ b/xdevs/celldevs/__init__.py @@ -0,0 +1,6 @@ +from __future__ import annotations +from typing import TypeVar + +C = TypeVar('C') # Variable type used for cell IDs +S = TypeVar('S') # Variable type used for cell states +V = TypeVar('V') # Variable type used for cell vicinities diff --git a/xdevs/celldevs/cell.py b/xdevs/celldevs/cell.py new file mode 100644 index 0000000..dcf69f1 --- /dev/null +++ b/xdevs/celldevs/cell.py @@ -0,0 +1,192 @@ +from __future__ import annotations +from abc import ABC, abstractmethod +from copy import deepcopy +from typing import Any, Generic +from xdevs.celldevs import C, S, V +from xdevs.celldevs.inout import CellMessage, InPort +from xdevs.models import Atomic +from xdevs.factory import DelayedOutputs, DelayedOutput + + +class CellConfig(Generic[C, S, V]): + def __init__(self, config_id: str, c_type: type[C], s_type: type[S], v_type: type[V], **kwargs): + """ + Cell-DEVS configuration structure. + :param config_id: identifier of the configuration. + :param c_type: type used to identify cells. + :param s_type: type used to represent cell states. + :param v_type: type used to represent vicinity between cells. + :param cell_type: identifier of the cell type. + :param delay: identifier of the delay buffer implemented by the cell. By default, it is set to inertial. + :param config: any additional configuration parameters. + :param state: parameters required to create the initial state of the cell. + :param neighborhood: representation of the cell neighborhood. By default, it is empty. + :param cell_map: list of cells that implement this configuration. By default, it is empty. + :param eic: list of external input couplings. By default, it is empty. + :param eoc: list of external output couplings. By default, it is empty. + """ + self.config_id: str = config_id + self.c_type: type[C] = c_type + self.s_type: type[S] = s_type + self.v_type: type[V] = v_type + CellMessage.state_t = s_type + + self.cell_type: str = kwargs['cell_type'] + self.delay_type: str = kwargs.get('delay', 'inertial') + self.cell_config = kwargs.get('config') + self.state = kwargs.get('state') + self.raw_neighborhood: list[dict] = kwargs.get('neighborhood', list()) + self.cell_map: list[C] | None = None if self.default else self._load_map(*kwargs.get('cell_map', list())) + self.eic: list[tuple[str, str]] = self._parse_couplings(kwargs.get('eic', list())) + self.ic: list[tuple[str, str]] = [('out_celldevs', 'in_celldevs')] + self.eoc: list[tuple[str, str]] = self._parse_couplings(kwargs.get('eoc', list())) + + @property + def default(self) -> bool: + """:return: true if this configuration profile is the default one.""" + return self.config_id == 'default' + + def apply_patch(self, config_id: str, **kwargs): + """ + Applies a configuration patch. This method is used for non-default configurations. + :param config_id: configuration ID. + :param cell_type: identifier of the cell type. + :param delay: identifier of the delay buffer implemented by the cell. By default, it is set to inertial. + :param config: any additional configuration parameters. + :param state: parameters required to create the initial state of the cell. + :param neighborhood: representation of the cell neighborhood. By default, it is empty. + :param cell_map: list of cells that implement this configuration. By default, it is empty. + :param eic: list of external input couplings. By default, it is empty. + :param ic: list of internal couplings. By default, it is empty. # TODO remove this? + :param eoc: list of external output couplings. By default, it is empty. + """ + self.config_id = config_id + self.cell_type = kwargs.get('cell_type', self.cell_type) + self.delay_type = kwargs.get('delay', self.delay_type) + if 'config' in kwargs: + self.cell_config = self._patch_dict(self.cell_config, kwargs['config']) \ + if isinstance(self.cell_config, dict) else kwargs['config'] + if 'state' in kwargs: + self.state = self._patch_dict(self.state, kwargs['state']) \ + if isinstance(self.state, dict) else kwargs['state'] + self.raw_neighborhood = kwargs.get('neighborhood', self.raw_neighborhood) + if 'cell_map' in kwargs: + self.cell_map = self._load_map(*kwargs['cell_map']) + if 'eic' in kwargs: + self.eic = self._parse_couplings(kwargs['eic']) + if 'ic' in kwargs: + self.ic = self._parse_couplings(kwargs['ic']) + if 'eoc' in kwargs: + self.eoc = self._parse_couplings(kwargs['eoc']) + + def load_state(self) -> S: + """:return: a new initial state structure.""" + return self._load_value(self.s_type, self.state) + + def load_neighborhood(self) -> dict[C, V]: + """:return: a new neighborhood.""" + neighbors: dict[C, V] = dict() + for neighborhood in self.raw_neighborhood: + for neighbor, vicinity in neighborhood.items(): + neighbors[self.c_type(neighbor)] = self._load_vicinity(vicinity) + return neighbors + + def _load_map(self, *args) -> list[C]: + return [self.c_type(self.config_id)] + + def _load_vicinity(self, vicinity: Any): + return self._load_value(self.v_type, vicinity) + + @classmethod + def _patch_dict(cls, d: dict, patch: dict) -> dict: + for k, v in patch.items(): + d[k] = cls._patch_dict(d[k], v) if isinstance(v, dict) and k in d and isinstance(d[k], dict) else v + return d + + @staticmethod + def _parse_couplings(couplings: list[list[str]]) -> list[tuple[str, str]]: + return [(coupling[0], coupling[1]) for coupling in couplings] + + @staticmethod + def _load_value(t_type, params: Any): + params = deepcopy(params) + if isinstance(params, dict): + return t_type(**params) + elif isinstance(params, list): + return t_type(*params) + elif params is not None: + return t_type(params) + return t_type() + + +class Cell(Atomic, ABC, Generic[C, S, V]): + def __init__(self, cell_id: C, config: CellConfig[C, S, V]): + """ + Abstract Base Class for a Cell-DEVS cell. + :param cell_id: cell identifier. + :param config: cell configuration structure. + """ + super().__init__(str(cell_id)) + self._clock: float = 0 + self._config: CellConfig = config + self.ics = config.eic + self.cell_id: C = cell_id + self.cell_state: S = config.load_state() + self.neighborhood: dict[C, V] = self._load_neighborhood() + + self.in_celldevs: InPort[C, S] = InPort(self.cell_id) + self.out_celldevs: DelayedOutput[C, S] = DelayedOutputs.create_delayed_output(config.delay_type, self.cell_id) + self.add_in_port(self.in_celldevs.port) + self.add_out_port(self.out_celldevs.port) + + @property + def neighbors_state(self) -> dict[C, S]: + return self.in_celldevs.history + + @abstractmethod + def local_computation(self, cell_state: S) -> S: + """ + Computes new cell state depending on its previous state. + :param cell_state: current cell state. + :return: new cell state. + """ + pass + + @abstractmethod + def output_delay(self, cell_state: S) -> float: + """ + Returns delay to be applied to output messages related to new cell state. + :param cell_state: new cell state. + :return: delay to be applied. + """ + pass + + def deltint(self): + self._clock += self.sigma + self.out_celldevs.clean(self._clock) + self.sigma = self.out_celldevs.next_time() - self._clock + + def deltext(self, e: float): + self._clock += e + self.sigma -= e + self.in_celldevs.read_new_events() + + new_state = self.local_computation(deepcopy(self.cell_state)) + if new_state != self.cell_state: + state = deepcopy(new_state) + self.out_celldevs.add_to_buffer(self._clock + self.output_delay(state), state) + self.sigma = self.out_celldevs.next_time() - self._clock + self.cell_state = new_state + + def lambdaf(self): + self.out_celldevs.send_events(self._clock + self.sigma) + + def initialize(self): + self.out_celldevs.add_to_buffer(0, self.cell_state) + self.activate() + + def exit(self): + pass + + def _load_neighborhood(self) -> dict[C, V]: + return self._config.load_neighborhood() diff --git a/xdevs/celldevs/coupled.py b/xdevs/celldevs/coupled.py new file mode 100644 index 0000000..fa25e00 --- /dev/null +++ b/xdevs/celldevs/coupled.py @@ -0,0 +1,90 @@ +from __future__ import annotations +import json +from abc import abstractmethod, ABC +from copy import deepcopy +from typing import Dict, Generic, Optional, Tuple, Type +from xdevs.celldevs import C, S, V +from xdevs.celldevs.cell import Cell, CellConfig +from xdevs.celldevs.grid import GridCell, GridCellConfig, GridScenario +from xdevs.models import Coupled + + +class CoupledCellDEVS(Coupled, ABC, Generic[C, S, V]): + def __init__(self, c_type: Type[C], s_type: Type[S], v_type: Type[V], config_file: str, name: Optional[str] = None): + super().__init__(name) + self.c_type: Type[C] = c_type + self.s_type: Type[S] = s_type + self.v_type: Type[V] = v_type + with open(config_file) as file: + self.raw_config = json.load(file) + self._configs: Dict[str, CellConfig[C, S, V]] = dict() + self._cells: Dict[C, Tuple[Cell[C, S, V], CellConfig]] = dict() + + def load_config(self): + raw_configs = self.raw_config['cells'] + default_config: CellConfig[C, S, V] = self._load_default_config(raw_configs['default']) + self._configs = {'default': default_config} + for config_id, raw_config in raw_configs.items(): + if config_id != 'default': + config = deepcopy(default_config) + config.apply_patch(config_id, **raw_config) + self._configs[config_id] = config + + def load_cells(self): + for cell_config in self._configs.values(): + if not cell_config.default: + for cell_id in cell_config.cell_map: + if cell_id in self._cells: + raise ValueError('cell with the same ID already exists') + cell: Cell[C, S, V] = self.create_cell(cell_config.cell_type, cell_id, cell_config) + self._cells[cell_id] = (cell, cell_config) + self.add_component(cell) + + def load_couplings(self): + for cell_to, cell_config in self._cells.values(): + for port_from, port_to in cell_config.eic: + self.add_coupling(self.get_in_port(port_from), cell_to.get_in_port(port_to)) + for neighbor in cell_to.neighborhood: + cell_from = self._cells[neighbor][0] + for port_from, port_to in cell_config.ic: + self.add_coupling(cell_from.get_out_port(port_from), cell_to.get_in_port(port_to)) + for port_from, port_to in cell_config.eoc: + self.add_coupling(cell_to.get_out_port(port_from), self.get_out_port(port_to)) + + def _load_default_config(self, raw_config: Dict) -> CellConfig[C, S, V]: + return CellConfig('default', self.c_type, self.s_type, self.v_type, **raw_config) + + @abstractmethod + def create_cell(self, cell_type: str, cell_id: C, cell_config: CellConfig[C, S, V]) -> Cell[C, S, V]: + pass + + +class CoupledGridCellDEVS(CoupledCellDEVS[Tuple[int, ...], S, V], ABC, Generic[S, V]): + + _configs: Dict[str, GridCellConfig] + + def __init__(self, s_type: Type[S], v_type: Type[V], config_file: str): + super().__init__(tuple, s_type, v_type, config_file) + + scenario_config = self.raw_config['scenario'] + shape = tuple(scenario_config['shape']) + origin = tuple(scenario_config['origin']) if 'origin' in scenario_config else None + wrapped = scenario_config.get('wrapped', False) + self.scenario: GridScenario = GridScenario(shape, origin, wrapped) + + def load_cells(self): + super().load_cells() + default_config = self._configs['default'] + for cell_id in self.scenario.iter_cells(): + if cell_id not in self._cells: + cell: GridCell[S, V] = self.create_cell(default_config.cell_type, cell_id, default_config) + self._cells[cell_id] = (cell, default_config) + self.add_component(cell) + + def _load_default_config(self, raw_config: Dict) -> GridCellConfig[S, V]: + return GridCellConfig(self.scenario, 'default', self.s_type, self.v_type, **raw_config) + + @abstractmethod + def create_cell(self, cell_type: str, cell_id: Tuple[int, ...], + cell_config: GridCellConfig[S, V]) -> GridCell[S, V]: + pass diff --git a/xdevs/celldevs/grid.py b/xdevs/celldevs/grid.py new file mode 100644 index 0000000..27f28fa --- /dev/null +++ b/xdevs/celldevs/grid.py @@ -0,0 +1,288 @@ +from __future__ import annotations +import math +from abc import ABC +from math import copysign, isinf +from typing import Dict, Generic, Iterator, List, Optional, Tuple, Type, Union +from xdevs.celldevs import S, V +from xdevs.celldevs.cell import Cell, CellConfig + +C = Tuple[int, ...] # Cell IDs in grids are tuples of integers + + +class GridScenario: + def __init__(self, shape: C, origin: Optional[C] = None, wrapped: bool = False): + """ + Grid scenario configuration. + :param shape: tuple describing the dimension of the scenario. + :param origin: tuple describing the origin of the scenario. By default, it is set to (0, 0, ...). + :param wrapped: if true, the scenario wraps the edges. It defaults to False. + """ + if len(shape) < 1: + raise ValueError('scenario dimension is invalid') + for dim in shape: + if dim <= 0: + raise ValueError('scenario shape is invalid') + self.shape = tuple(shape) + if origin is None: + origin = tuple(0 for _ in range(self.dimension)) + if len(origin) != len(shape): + raise ValueError('scenario shape and origin must have the same dimension') + self.origin = tuple(origin) + self.wrapped = wrapped + + @property + def dimension(self) -> int: + """:return: number of dimensions of the scenario.""" + return len(self.shape) + + def cell_in_scenario(self, cell: C) -> bool: + """ + Checks if a cell is inside the scenario. + :param cell: coordinates of the cell under study. + :return: True if the coordinates of the cell are inside the scenario. + """ + return self._cell_in_scenario(cell, self.shape, self.origin) + + def distance_vector(self, cell_from: C, cell_to: C) -> C: + """ + Computes the distance vector between two cells. + :param cell_from: origin cell. + :param cell_to: destination cell. + :return: relative distance vector. + """ + return self._distance_vector(cell_from, cell_to, self.shape, self.origin, self.wrapped) + + def cell_to(self, cell_from: C, distance_vector: C): + """ + Deduces destination cell according to an origin cell and a distance vector. + :param cell_from: origin cell. + :param distance_vector: distance vector. + :return: destination cell. + """ + if not self.cell_in_scenario(cell_from): + raise ValueError('cell_from is not part of the scenario') + elif len(distance_vector) != self.dimension: + raise ValueError('scenario shape and distance_vector must have the same dimension') + cell_to: C = tuple(cell_from[i] + distance_vector[i] for i in range(self.dimension)) + if self.wrapped: + cell_to = tuple((cell_to[i] + self.shape[i]) % self.shape[i] for i in range(self.dimension)) + if not self.cell_in_scenario(cell_to): + raise OverflowError('cell_to is not part of the scenario') + return cell_to + + def minkowski_distance(self, p: int, cell_from: C, cell_to: C) -> Union[int, float]: + """ + Computes Minkowski distance between two cells. + :param p: Minkowski distance order. + :param cell_from: origin cell. + :param cell_to: destination cell. + :return: Minkowski distance between cells. + """ + return self._minkowski_distance(p, cell_from, cell_to, self.shape, self.origin, self.wrapped) + + def moore_neighborhood(self, r: int = 1) -> List[C]: + """ + Creates a Moore neighborhood of the desired range. + :param r: neighborhood range. + :return: List with relative distance vectors of neighbors. + """ + return self._moore_neighborhood(self.dimension, r) + + def von_neumann_neighborhood(self, r: int = 1) -> List[C]: + """ + Creates a von Neumann neighborhood of the desired range. + :param r: neighborhood range. + :return: List with relative distance vectors of neighbors. + """ + return self._von_neumann_neighborhood(self.dimension, r) + + def iter_cells(self) -> Iterator[C]: + """:return: iterator that goes through all the cells in the scenario.""" + return self._iter_cells(self.shape, self.origin) + + @staticmethod + def _cell_in_scenario(cell: C, shape: C, origin: C) -> bool: + if len(cell) != len(shape): + raise ValueError('scenario shape and cell location must have the same dimension') + return all(0 <= cell[i] - origin[i] < shape[i] for i in range(len(shape))) + + @classmethod + def _distance_vector(cls, cell_from: C, cell_to: C, shape: C, origin: C, wrapped: bool) -> C: + if not cls._cell_in_scenario(cell_from, shape, origin) or not cls._cell_in_scenario(cell_to, shape, origin): + raise ValueError('cell_from and/or cell_to are not part of the scenario') + dimension = len(shape) + distance: C = tuple(cell_to[i] - cell_from[i] for i in range(dimension)) + if wrapped: + distance = tuple(distance[i] - copysign(shape[i], distance[i]) if abs(distance[i]) > shape[i] / 2 + else distance[i] for i in range(dimension)) + return distance + + @classmethod + def _minkowski_distance(cls, p: int, cell_from: C, cell_to: C, shape: C, + origin: C, wrapped: bool) -> Union[int, float]: + if p <= 0: + raise ValueError('Minkowski distance is only valid for p greater than 0') + d_vector: C = cls._distance_vector(cell_from, cell_to, shape, origin, wrapped) + if p == 1: + return sum(abs(d) for d in d_vector) + elif isinf(p): + return max(abs(d) for d in d_vector) + else: + return sum(abs(d) ** p for d in d_vector) ** (1 / p) + + @classmethod + def _moore_neighborhood(cls, dim: int, r: int) -> List[C]: + if dim < 0: + raise ValueError('invalid number of dimensions') + if r < 0: + raise ValueError('neighborhood range must be greater than or equal to 0') + n_shape: C = tuple(2 * r + 1 for _ in range(dim)) + n_origin: C = tuple(-r for _ in range(dim)) + return list(cls._iter_cells(n_shape, n_origin)) + + @classmethod + def _von_neumann_neighborhood(cls, dim: int, r: int) -> List[C]: + moore: List[C] = cls._moore_neighborhood(dim, r) + n_shape: C = tuple(2 * r + 1 for _ in range(dim)) + n_origin: C = tuple(-r for _ in range(dim)) + center: C = tuple(0 for _ in range(dim)) + neighborhood: List[C] = list() + for neighbor in moore: + if cls._minkowski_distance(1, center, neighbor, n_shape, n_origin, False) <= r: + neighborhood.append(neighbor) + return neighborhood + + @classmethod + def _next_cell(cls, prev_cell: C, shape: C, origin: C, d: int) -> Optional[C]: + if cls._cell_in_scenario(prev_cell, shape, origin): + if prev_cell[d] - origin[d] < shape[d] - 1: + return tuple(prev_cell[i] if i != d else prev_cell[i] + 1 for i in range(len(shape))) + elif d < len(shape) - 1: + prev_cell = tuple(prev_cell[i] if i != d else origin[i] for i in range(len(shape))) + return cls._next_cell(prev_cell, shape, origin, d + 1) + + @classmethod + def _iter_cells(cls, shape: C, origin: C) -> Iterator[C]: + cell: C = origin + while cell is not None: + yield cell + cell = cls._next_cell(cell, shape, origin, 0) + + +class GridCellConfig(CellConfig[C, S, V], Generic[S, V]): + def __init__(self, scenario: GridScenario, config_id: str, s_type: Type[S], v_type: Type[V], **kwargs): + """ + Grid cell configuration structure. + :param scenario: grid scenario structure. + :param config_id: identifier of the configuration. + :param s_type: type used to represent cell states. + :param v_type: type used to represent vicinity between cells. + :param kwargs: any additional configuration parameters required for creating a cell configuration structure. + """ + self.scenario: GridScenario = scenario + super().__init__(config_id, tuple, s_type, v_type, **kwargs) + + def _load_map(self, *args) -> List[C]: + return [tuple(cell_id) for cell_id in args] + + def load_cell_neighborhood(self, cell: C) -> Dict[C, V]: + """ + Creates the neighborhood corresponding to a given cell. + :param cell: target cell tu create the neighborhood. + :return: dictionary {neighbor cell: vicinity} + """ + neighbors: Dict[C, V] = dict() + for neighborhood in self.raw_neighborhood: + vicinity = neighborhood.get('vicinity') + n_type: str = neighborhood.get('type', 'absolute') + if n_type == 'absolute': + for neighbor in neighborhood.get('neighbors', list()): + neighbor = tuple(neighbor) + if not self.scenario.cell_in_scenario(neighbor): + raise OverflowError('absolute neighbor is not part of the scenario') + neighbors[neighbor] = self._load_vicinity(vicinity) + else: + if n_type == 'relative': + relative: List[C] = [tuple(neighbor) for neighbor in neighborhood.get('neighbors', list())] + elif n_type == 'moore': + relative: List[C] = self.scenario.moore_neighborhood(neighborhood.get('range', 1)) + elif n_type == 'von_neumann': + relative: List[C] = self.scenario.von_neumann_neighborhood(neighborhood.get('range', 1)) + else: + raise ValueError('unknown neighborhood type') + for neighbor in relative: + try: + neighbors[self.scenario.cell_to(cell, tuple(neighbor))] = self._load_vicinity(vicinity) + except OverflowError: + continue + return neighbors + + +class GridCell(Cell[C, S, V], ABC, Generic[S, V]): + + _config: GridCellConfig[S, V] + + def __init__(self, cell_id: C, config: GridCellConfig): + """ + Grid Cell class for Cell-DEVS scenarios. + :param config: configuration structure for grid cells. + """ + super().__init__(cell_id, config) + self.scenario = config.scenario + + @property + def location(self) -> C: + """:return: location of the cell.""" + return self.cell_id + + def minkowski_distance(self, p: int, other: C) -> float: + """ + Computes Minkowski distance from cell to another cell. + :param p: Minkowski distance order. + :param other: destination cell. + :return: Minkowski distance. + """ + return self.scenario.minkowski_distance(p, self.location, other) + + def manhattan_distance(self, other: C) -> int: + """ + Computes Manhattan distance from cell to another cell + :param other: destination cell. + :return: Manhattan distance. + """ + return self.scenario.minkowski_distance(1, self.location, other) + + def euclidean_distance(self, other: C) -> float: + """ + Computes Euclidean distance from cell to another cell. + :param other: destination cell. + :return: Euclidean distance. + """ + return self.scenario.minkowski_distance(2, self.location, other) + + def chebyshev_distance(self, other: C) -> int: + """ + Computes Chebyshev distance from cell to another cell. + :param other: destination cell. + :return: Chebyshev distance. + """ + return self.scenario.minkowski_distance(math.inf, self.location, other) + + def neighbor(self, relative: C) -> C: + """ + Computes the coordinates of a neighboring cell from a relative distance vector. + :param relative: relative distance vector. + :return: coordinates of neighboring cell. + """ + return self.scenario.cell_to(self.location, relative) + + def relative(self, neighbor: C) -> C: + """ + Computes the relative distance vector from the coordinates of a neighboring cell. + :param neighbor: coordinates of a neighboring cell. + :return: relative distance vector. + """ + return self.scenario.distance_vector(self.location, neighbor) + + def _load_neighborhood(self) -> Dict[C, V]: + return self._config.load_cell_neighborhood(self.cell_id) diff --git a/xdevs/celldevs/inout.py b/xdevs/celldevs/inout.py new file mode 100644 index 0000000..c15faa0 --- /dev/null +++ b/xdevs/celldevs/inout.py @@ -0,0 +1,52 @@ +from __future__ import annotations +from typing import ClassVar, Dict, Generic, Optional, Type, Callable, Any +from xdevs.models import Port +from xdevs.abc.transducer import Transducible, T +from xdevs.celldevs import C, S + + +class CellMessage(Transducible, Generic[C, S]): + + state_t: ClassVar[Type[S]] = None + + def __init__(self, cell_id: C, cell_state: S): + self.cell_id = cell_id + self.cell_state = cell_state + + @classmethod + def transducer_map(cls) -> dict[str, tuple[Type[T], Callable[[Any], T]]]: + if issubclass(cls.state_t, Transducible): + res = {'cell_id': (str, lambda x: x.cell_id)} + for field, (t, l) in cls.state_t.transducer_map().items(): + res[field] = (t, lambda x: l(x.cell_state)) + return res + # return { + # 'cell_id': (str, lambda x: x.cell_id), + # **{field: (t, lambda x: l(x.cell_state)) for field, (t, l) in cls.state_t.transducer_map().items()} + #} + return { + 'cell_id': (str, lambda x: x.cell_id), + 'cell_state': (str, lambda x: x.cell_state), + } + +class InPort(Generic[C, S]): + def __init__(self, serve: bool = False): + """ + Cell-DEVS in port. + :param serve: set to True if the port is going to be accessible via RPC server. Defaults to False. + """ + self.port: Port[CellMessage[C, S]] = Port(CellMessage, 'in_celldevs', serve) + self.history: Dict[C, S] = dict() + + def read_new_events(self): + """It stores the latest incoming events into self.history""" + for cell_message in self.port.values: + self.history[cell_message.cell_id] = cell_message.cell_state + + def get(self, cell_id: C) -> Optional[S]: + """ + Returns latest received event. + :param cell_id: ID of the cell that sent the event. + :return: latest received event. If no event has been received, it returns None. + """ + return self.history.get(cell_id) diff --git a/xdevs/examples/async_rt/basic.py b/xdevs/examples/async_rt/basic.py deleted file mode 100644 index 5d2bf1c..0000000 --- a/xdevs/examples/async_rt/basic.py +++ /dev/null @@ -1,271 +0,0 @@ -import logging -import queue -import random -import time - -from xdevs import PHASE_ACTIVE, PHASE_PASSIVE, get_logger -from xdevs.models import Atomic, Coupled, Port -from xdevs.rt import RealTimeCoordinator, RealTimeManager - -logger = get_logger(__name__, logging.INFO) - -PHASE_DONE = "done" - - -class Job: - def __init__(self, name): - self.name = name - self.time = 0 - - def __str__(self): - return f'Job:: {self.name}' - - -class Generator(Atomic): - - def __init__(self, name, period): - super().__init__(name) - self.i_start = Port(Job, "i_start") - self.i_extern = Port(Job, "i_extern") # receives additional jobs from outside - self.i_stop = Port(Job, "i_stop") - self.o_out = Port(Job, "o_out") - - self.add_in_port(self.i_start) - self.add_in_port(self.i_stop) - self.add_in_port(self.i_extern) - self.add_out_port(self.o_out) - - self.period = period - self.job_counter = 1 - self.extern_jobs = list() # stores external jobs - - self.generate = True - - def initialize(self): - self.hold_in(PHASE_ACTIVE, self.period) - - def exit(self): - pass - - def deltint(self): - self.job_counter += 1 - self.extern_jobs.clear() - self.hold_in(PHASE_ACTIVE, self.period) - - def deltext(self, e): - self.sigma -= e - for msg in self.i_extern.values: - logger.info("Generator received external job. It will forward it in the next lambda") - self.extern_jobs.append(msg) - if not self.i_stop.empty(): - self.generate = False - - def lambdaf(self): - if self.generate: - job = Job(str(self.job_counter)) - self.o_out.add(job) - logger.info("Starting job %s @ t_r = %f" % (job.name, time.time())) - for msg in self.extern_jobs: # we also forward external messages - self.o_out.add(msg) - logger.info("Starting job %s @ t_r = %f" % (msg.name, time.time())) - - -class Processor(Atomic): - def __init__(self, name, proc_time): - super().__init__(name) - - self.i_in = Port(Job, "i_in") - self.o_out = Port(Job, "o_out") - - self.add_in_port(self.i_in) - self.add_out_port(self.o_out) - - self.current_job = None - self.proc_time = proc_time - - def initialize(self): - self.passivate() - - def exit(self): - pass - - def deltint(self): - self.passivate() - - def deltext(self, e): - if self.phase == PHASE_PASSIVE: - self.current_job = self.i_in.get() - self.hold_in(PHASE_ACTIVE, self.proc_time) - self.continuef(e) - - def lambdaf(self): - self.o_out.add(self.current_job) - logger.info("Job %s finished @ t_r = %f" % (self.current_job.name, time.time())) - - -class Transducer(Atomic): - - def __init__(self, name, obs_time): - super().__init__(name) - - self.i_arrived = Port(Job, "i_arrived") - self.i_solved = Port(Job, "i_solved") - self.o_out = Port(Job, "o_out") - - self.add_in_port(self.i_arrived) - self.add_in_port(self.i_solved) - self.add_out_port(self.o_out) - - self.jobs_arrived = [] - self.jobs_solved = [] - - self.total_ta = 0 - self.clock = 0 - self.obs_time = obs_time - - def initialize(self): - self.hold_in(PHASE_ACTIVE, self.obs_time) - - def exit(self): - pass - - def deltint(self): - self.clock += self.sigma - - if self.phase == PHASE_ACTIVE: - if self.jobs_solved: - avg_ta = self.total_ta / len(self.jobs_solved) - throughput = len(self.jobs_solved) / self.clock if self.clock > 0 else 0 - else: - avg_ta = 0 - throughput = 0 - - logger.info("End time: %f" % self.clock) - logger.info("Jobs arrived: %d" % len(self.jobs_arrived)) - logger.info("Jobs solved: %d" % len(self.jobs_solved)) - logger.info("Average TA: %f" % avg_ta) - logger.info("Throughput: %f\n" % throughput) - - self.hold_in(PHASE_DONE, 0) - else: - self.passivate() - - def deltext(self, e): - self.clock += e - - if self.phase == PHASE_ACTIVE: - for job in self.i_arrived.values: - # logger.info("Starting job %s @ t = %d @ t_r = %f" % (job.name, self.clock, time.time())) - job.time = self.clock - self.jobs_arrived.append(job) - - if self.i_solved: - job = self.i_solved.get() - # logger.info("Job %s finished @ t = %d @ t_r = %f" % (job.name, self.clock, time.time())) - self.total_ta += self.clock - job.time - self.jobs_solved.append(job) - - self.continuef(e) - - def lambdaf(self): - if self.phase == PHASE_DONE: - self.o_out.add(Job("null")) - - -class RTGpt(Coupled): - def __init__(self, name, period, obs_time): - super().__init__(name) - - if period < 1: - raise ValueError("period has to be greater than 0") - - if obs_time < 0: - raise ValueError("obs_time has to be greater or equal than 0") - - gen = Generator("generator", period) - proc = Processor("processor", 3 * period) - trans = Transducer("transducer", obs_time) - - self.add_component(gen) - self.add_component(proc) - self.add_component(trans) - - # new input port for receiving input events - self.i_extern = Port(Job, "i_extern") - self.add_in_port(self.i_extern) - # new coupling for forwarding messages to generator - self.add_coupling(self.i_extern, gen.i_extern) - - # new output port for sending solved jobs - self.o_extern_proc = Port(Job, 'o_extern_proc') - self.o_extern_gen = Port(Job, 'o_extern_gen') - self.o_extern_trans = Port(Job, 'o_extern_trans') - self.add_out_port(self.o_extern_proc) - self.add_out_port(self.o_extern_gen) - self.add_out_port(self.o_extern_trans) - # new coupling - self.add_coupling(proc.o_out, self.o_extern_proc) - self.add_coupling(gen.o_out, self.o_extern_gen) - self.add_coupling(trans.o_out, self.o_extern_trans) - - self.add_coupling(gen.o_out, proc.i_in) - self.add_coupling(gen.o_out, trans.i_arrived) - self.add_coupling(proc.o_out, trans.i_solved) - self.add_coupling(trans.o_out, gen.i_stop) - - -def inject_messages(q: queue.SimpleQueue): - i = -1 - while True: - f = round(random.gauss(3, 0.6), 2) - # f = 3 - time.sleep(f) # duermo f segundos - # la cola espera tuplas (port_name, msg) - q.put(("i_extern", Job(i))) - i -= 1 - # test ventana manager - # time.sleep(0.3) - # q.put(("i_extern", Job(i))) - # i -= 1 - - -if __name__ == '__main__': - execution_time = 31 - time_scale = 1 - max_jitter = 0.2 - event_window = 0.5 - - gpt = RTGpt("gpt", 2, 3600) - - manager = RealTimeManager(max_jitter=max_jitter, time_scale=time_scale, event_window=event_window) - - parsers = { - 'i_extern': lambda x: Job(x), # le digo al input handler como convertir el string a Job con una funciĆ³n - 'tcp': lambda x: x.decode().split('.'), - } - manager.add_input_handler('csv_handler', file="prueba.csv", msg_parsers=parsers) - - # manager.add_input_handler('function', function=inject_messages) - # Si no quiero ir repitiendo parsers, se lo tendria que meter al manager - # manager.add_input_handler('tcp_handler', HOST='LocalHost', PORT=5055, parsers=parsers) - - # manager.add_output_handler('csv_out_handler', file='csv_output_v3.csv') - - # manager.add_output_handler('tcp_out_handler', PORT=1234) - - sub: dict = { - 'RTsys/i_extern': 0, - } - #manager.add_input_handler('mqtt_handler', subscriptions=sub, msg_parsers=parsers) - - #manager.add_output_handler('mqtt_handler') - - c = RealTimeCoordinator(gpt, manager) - t_ini = time.time() - print(f' >>> COMENZAMOS : {t_ini}') - c.simulate_iters(time_interv=execution_time) - print(f' >>> FIN : {time.time()}') - print(f' Tiempo a ejecutar (s) = {execution_time * time_scale}') - print(f' Tiempo ejecutado (s) = {(time.time() - t_ini)}') - print(f' Error (%) = ' - f'{((time.time() - t_ini - (execution_time * time_scale)) / (execution_time * time_scale)) * 100}') diff --git a/xdevs/examples/async_rt/csv_output_v3.csv b/xdevs/examples/async_rt/csv_output_v3.csv deleted file mode 100644 index 1276501..0000000 --- a/xdevs/examples/async_rt/csv_output_v3.csv +++ /dev/null @@ -1,40 +0,0 @@ -t,port,msg -2.011455774307251,o_extern_gen,Job:: 1 -4.015483379364014,o_extern_gen,Job:: 2 -4.015483379364014,o_extern_gen,Job:: -1_csv -4.015483379364014,o_extern_gen,Job:: -1 -6.0036680698394775,o_extern_proc,Job:: 1 -6.004634857177734,o_extern_gen,Job:: 3 -6.004634857177734,o_extern_gen,Job:: -2 -8.01294207572937,o_extern_gen,Job:: 4 -8.01294207572937,o_extern_gen,Job:: -2_csv -10.01301908493042,o_extern_gen,Job:: 5 -10.01401972770691,o_extern_gen,Job:: -3_csv -10.01401972770691,o_extern_gen,Job:: -3 -12.008435010910034,o_extern_proc,Job:: 3 -12.008435010910034,o_extern_gen,Job:: 6 -14.014714479446411,o_extern_gen,Job:: 7 -14.014714479446411,o_extern_gen,Job:: -4_csv -14.014714479446411,o_extern_gen,Job:: -4 -16.0102641582489,o_extern_gen,Job:: 8 -16.011170864105225,o_extern_gen,Job:: -5_csv -16.011170864105225,o_extern_gen,Job:: -5 -18.002914667129517,o_extern_proc,Job:: 6 -18.002914667129517,o_extern_gen,Job:: 9 -20.006149291992188,o_extern_gen,Job:: 10 -20.006149291992188,o_extern_gen,Job:: -6 -20.006149291992188,o_extern_gen,Job:: -6_csv -22.01274347305298,o_extern_gen,Job:: 11 -22.01274347305298,o_extern_gen,Job:: -7 -22.01274347305298,o_extern_gen,Job:: -7_csv -24.00597643852234,o_extern_proc,Job:: 9 -24.00597643852234,o_extern_gen,Job:: 12 -24.00597643852234,o_extern_gen,Job:: -8 -26.00346088409424,o_extern_gen,Job:: 13 -26.00346088409424,o_extern_gen,Job:: -8_csv -28.013100385665894,o_extern_gen,Job:: 14 -28.013100385665894,o_extern_gen,Job:: -9 -28.013100385665894,o_extern_gen,Job:: -9_csv -30.007229804992676,o_extern_proc,Job:: 12 -30.007229804992676,o_extern_gen,Job:: 15 -30.007229804992676,o_extern_gen,Job:: -10 diff --git a/xdevs/examples/async_rt/prueba.csv b/xdevs/examples/async_rt/prueba.csv deleted file mode 100644 index 454c49a..0000000 --- a/xdevs/examples/async_rt/prueba.csv +++ /dev/null @@ -1,10 +0,0 @@ -t,port,msg -3,i_extern,-1_csv -3,i_extern,-2_csv -3,i_extern,-3_csv -3,i_extern,-4_csv -3,i_extern,-5_csv -3,i_extern,-6_csv -3,i_extern,-7_csv -3,i_extern,-8_csv -3,i_extern,-9_csv \ No newline at end of file diff --git a/xdevs/examples/celldevs_sir/__init__.py b/xdevs/examples/celldevs_sir/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xdevs/examples/celldevs_sir/main.py b/xdevs/examples/celldevs_sir/main.py new file mode 100644 index 0000000..52ee898 --- /dev/null +++ b/xdevs/examples/celldevs_sir/main.py @@ -0,0 +1,37 @@ +import math +from xdevs.celldevs.inout import CellMessage +from xdevs.models import Coupled +from xdevs.sim import Coordinator +from xdevs.factory import Transducer, Transducers +from sir_coupled import SIRGridCoupled +from sir_sink import SIRSink, State + + +class SIRModel(Coupled): + def __init__(self, config_path: str): + super().__init__() + self.celldevs = SIRGridCoupled(config_path) + self.sink = SIRSink('sink') + + self.add_component(self.celldevs) + self.add_component(self.sink) + self.add_coupling(self.celldevs.sink_port, self.sink.in_sink) + + +if __name__ == '__main__': + model = SIRModel('scenario.json') + + celldevs_transducer: Transducer = Transducers.create_transducer('csv', transducer_id='celldevs', + event_type=CellMessage, include_names=False, + exhaustive=True) + celldevs_transducer.add_target_port(model.celldevs.sink_port) + + sink_transducer: Transducer = Transducers.create_transducer('csv', transducer_id='sink', + event_type=State, include_names=False) + sink_transducer.add_target_port(model.sink.out_sink) + + coordinator = Coordinator(model) + coordinator.add_transducer(celldevs_transducer) + coordinator.add_transducer(sink_transducer) + coordinator.initialize() + coordinator.simulate_time(math.inf) diff --git a/xdevs/examples/celldevs_sir/scenario.json b/xdevs/examples/celldevs_sir/scenario.json new file mode 100644 index 0000000..2354737 --- /dev/null +++ b/xdevs/examples/celldevs_sir/scenario.json @@ -0,0 +1,49 @@ +{ + "scenario":{ + "shape": [25, 25], + "origin": [-12, -12], + "wrapped": false + }, + "cells": { + "default": { + "delay": "inertial", + "cell_type": "hoya", + "neighborhood": [ + { + "type": "von_neumann", + "range": 1, + "vicinity": { + "connectivity": 1, + "mobility": 0.5 + } + }, + { + "type": "relative", + "neighbors": [[0, 0]], + "vicinity": { + "connectivity": 1, + "mobility": 1 + } + } + ], + "state": { + "population": 100, + "susceptible": 1, + "infected": 0, + "recovered": 0 + }, + "config": { + "virulence": 0.6, + "recovery": 0.4 + }, + "eoc": [["out_celldevs", "out_sink"]] + }, + "epicenter": { + "state": { + "susceptible": 0.7, + "infected": 0.3 + }, + "cell_map": [[0, 0]] + } + } +} diff --git a/xdevs/examples/celldevs_sir/sir_cell.py b/xdevs/examples/celldevs_sir/sir_cell.py new file mode 100644 index 0000000..46f1a30 --- /dev/null +++ b/xdevs/examples/celldevs_sir/sir_cell.py @@ -0,0 +1,68 @@ +from __future__ import annotations +from typing import Callable, Any +from xdevs.celldevs.cell import S +from xdevs.celldevs.grid import C, GridCell, GridCellConfig +from xdevs.abc.transducer import Transducible, T + + +class State(Transducible): + def __init__(self, population: int, susceptible: float, infected: float, recovered: float): + self.population: int = population + self.susceptible: float = susceptible + self.infected: float = infected + self.recovered: float = recovered + + def __eq__(self, other: State): + return self.population == other.population and self.susceptible == other.susceptible \ + and self.infected == other.infected and self.recovered == other.recovered + + @classmethod + def transducer_map(cls) -> dict[str, tuple[type[T], Callable[[Any], T]]]: + return { + 'population': (int, lambda x: x.population), + 'susceptible': (float, lambda x: x.susceptible), + 'infected': (float, lambda x: x.infected), + 'recovered': (float, lambda x: x.recovered) + } + + +class Vicinity: + def __init__(self, connectivity: float, mobility: float): + self.connectivity: float = connectivity + self.mobility: float = mobility + + @property + def correlation(self) -> float: + return self.connectivity * self.mobility + + +class Config: + def __init__(self, virulence: float, recovery: float): + self.virulence = virulence + self.recovery = recovery + + +class SIRGridCell(GridCell[State, Vicinity]): + def __init__(self, cell_id: C, config: GridCellConfig): + super().__init__(cell_id, config) + self.config: Config = Config(**config.cell_config) + + def local_computation(self, cell_state: S) -> S: + new_infections = self.new_infections(cell_state) + new_recoveries = self.new_recoveries(cell_state) + cell_state.recovered = round((cell_state.recovered + new_recoveries) * 100) / 100 + cell_state.infected = round((cell_state.infected + new_infections - new_recoveries) * 100) / 100 + cell_state.susceptible = 1 - cell_state.infected - cell_state.recovered + return cell_state + + def new_infections(self, state: State) -> float: + neighbor_effect = sum(state.infected * state.population * self.neighborhood[neighbor].correlation + for neighbor, state in self.neighbors_state.items()) + new_infections = state.susceptible * self.config.virulence * neighbor_effect / state.population + return min(state.susceptible, new_infections) + + def new_recoveries(self, state: State) -> float: + return state.infected * self.config.recovery + + def output_delay(self, cell_state: S) -> float: + return 1 diff --git a/xdevs/examples/celldevs_sir/sir_coupled.py b/xdevs/examples/celldevs_sir/sir_coupled.py new file mode 100644 index 0000000..dbac3d1 --- /dev/null +++ b/xdevs/examples/celldevs_sir/sir_coupled.py @@ -0,0 +1,36 @@ +import math + +from xdevs.celldevs.coupled import CoupledGridCellDEVS +from xdevs.celldevs.grid import C, GridCellConfig, GridCell +from xdevs.celldevs.inout import CellMessage +from xdevs.models import Port +from xdevs.sim import Coordinator +from xdevs.factory import Transducer, Transducers +from sir_cell import State, Vicinity, SIRGridCell + + +class SIRGridCoupled(CoupledGridCellDEVS[State, Vicinity]): + def __init__(self, config_file: str): + super().__init__(State, Vicinity, config_file) + self.sink_port = Port(CellMessage, 'out_sink') + self.add_out_port(self.sink_port) + + self.load_config() + self.load_cells() + self.load_couplings() + + def create_cell(self, cell_type: str, cell_id: C, cell_config: GridCellConfig[State, Vicinity]) -> GridCell[State, Vicinity]: + return SIRGridCell(cell_id, cell_config) + + +if __name__ == '__main__': + model = SIRGridCoupled('scenario.json') + + transducer: Transducer = Transducers.create_transducer('csv', transducer_id='transducer', event_type=CellMessage, + include_names=False, exhaustive=True) + transducer.add_target_port(model.sink_port) + + coordinator = Coordinator(model) + coordinator.add_transducer(transducer) + coordinator.initialize() + coordinator.simulate_time(math.inf) diff --git a/xdevs/examples/celldevs_sir/sir_sink.py b/xdevs/examples/celldevs_sir/sir_sink.py new file mode 100644 index 0000000..105cae4 --- /dev/null +++ b/xdevs/examples/celldevs_sir/sir_sink.py @@ -0,0 +1,53 @@ +from typing import Dict, NoReturn, Optional, Tuple +from sir_cell import State +from xdevs.celldevs.inout import CellMessage +from xdevs.models import Atomic, Port + + +class SIRSink(Atomic): + def __init__(self, name: str = None): + super().__init__(name) + self.started: bool = False + self.cell_reports: Dict[Tuple[int, ...], State] = dict() + self.scenario_report: Optional[State] = None + + self.in_sink: Port[CellMessage[Tuple[int, ...], State]] = Port(CellMessage, 'in_sink') + self.out_sink: Port[State] = Port(State, 'out_sink') + self.add_in_port(self.in_sink) + self.add_out_port(self.out_sink) + + def deltint(self) -> NoReturn: + self.passivate() + self.started = True + + def deltext(self, e: float) -> NoReturn: + self.activate() + for msg in self.in_sink.values: + if self.started: + prev_report = self.cell_reports[msg.cell_id] + delta_s = msg.cell_state.susceptible - prev_report.susceptible + delta_i = msg.cell_state.infected - prev_report.infected + delta_r = msg.cell_state.recovered - prev_report.recovered + self.scenario_report.susceptible += delta_s * prev_report.population / self.scenario_report.population + self.scenario_report.infected += delta_i * prev_report.population / self.scenario_report.population + self.scenario_report.recovered += delta_r * prev_report.population / self.scenario_report.population + self.cell_reports[msg.cell_id] = msg.cell_state + if not self.started: + self.scenario_report: State = State(0, 0, 0, 0) + for cell_state in self.cell_reports.values(): + self.scenario_report.population += cell_state.population + self.scenario_report.susceptible += cell_state.population * cell_state.susceptible + self.scenario_report.infected += cell_state.population * cell_state.infected + self.scenario_report.recovered += cell_state.population * cell_state.recovered + self.scenario_report.susceptible /= self.scenario_report.population + self.scenario_report.infected /= self.scenario_report.population + self.scenario_report.recovered /= self.scenario_report.population + + def lambdaf(self) -> NoReturn: + self.out_sink.add(self.scenario_report) + + def initialize(self) -> NoReturn: + self.passivate() + + def exit(self) -> NoReturn: + pass diff --git a/xdevs/factory.py b/xdevs/factory.py index 49690c6..3ec298b 100644 --- a/xdevs/factory.py +++ b/xdevs/factory.py @@ -3,7 +3,8 @@ import sys from importlib.metadata import entry_points, EntryPoint from typing import ClassVar -from xdevs.abc import InputHandler, OutputHandler, Transducer +from xdevs.abc import InputHandler, OutputHandler, Transducer, DelayedOutput +from xdevs.celldevs import C from xdevs.models import Atomic, Component, Port, Coupled @@ -244,3 +245,22 @@ def from_json(file_path: str): config = data[name] # Gets the actual component config return Components._nested_component(name, config) + + +class DelayedOutputs: + + _plugins: ClassVar[dict[str, type[DelayedOutput]]] = { + ep.name: ep.load() for ep in load_entry_points('xdevs.celldevs_outputs') + } + + @staticmethod + def add_plugin(delay_id: str, plugin: type[DelayedOutput]): + if delay_id in DelayedOutputs._plugins: + raise ValueError('xDEVS Cell-DEVS delayed output plugin with name "{}" already exists'.format(delay_id)) + DelayedOutputs._plugins[delay_id] = plugin + + @staticmethod + def create_delayed_output(delay_id: str, cell_id: C, serve: bool = False) -> DelayedOutput: + if delay_id not in DelayedOutputs._plugins: + raise ValueError('xDEVS Cell-DEVS delayed output plugin with name "{}" not found'.format(delay_id)) + return DelayedOutputs._plugins[delay_id](cell_id, serve) diff --git a/xdevs/plugins/celldevs_outputs/__init__.py b/xdevs/plugins/celldevs_outputs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xdevs/plugins/celldevs_outputs/hybrid.py b/xdevs/plugins/celldevs_outputs/hybrid.py new file mode 100644 index 0000000..85c8d57 --- /dev/null +++ b/xdevs/plugins/celldevs_outputs/hybrid.py @@ -0,0 +1,26 @@ +from __future__ import annotations +from collections import deque +from typing import Deque, Generic +from xdevs.abc.celldevs import C, S, DelayedOutput, INFINITY + + +class HybridDelayedOutput(DelayedOutput[C, S], Generic[C, S]): + def __init__(self, cell_id: C, serve: bool = False): + super().__init__(cell_id, serve) + self.last_state: S | None = None + self.next_states: Deque[tuple[float, S]] = deque() + + def add_to_buffer(self, when: float, state: S): + while self.next_states and self.next_states[-1][0] >= when: + self.next_states.pop() + self.next_states.append((when, state)) + + def next_time(self) -> float: + return INFINITY if not self.next_states else self.next_states[0][0] + + def next_state(self) -> S: + return self.last_state if not self.next_states else self.next_states[0][1] + + def pop_state(self): + if self.next_states: + self.last_state = self.next_states.popleft()[1] diff --git a/xdevs/plugins/celldevs_outputs/inertial.py b/xdevs/plugins/celldevs_outputs/inertial.py new file mode 100644 index 0000000..34f7808 --- /dev/null +++ b/xdevs/plugins/celldevs_outputs/inertial.py @@ -0,0 +1,22 @@ +from __future__ import annotations +from typing import Generic +from xdevs.abc.celldevs import C, S, DelayedOutput, INFINITY + + +class InertialDelayedOutput(DelayedOutput[C, S], Generic[C, S]): + def __init__(self, cell_id: C, serve: bool = False): + super().__init__(cell_id, serve) + self.last_state: S | None = None + self.next_t: float = INFINITY + + def add_to_buffer(self, when: float, state: S): + self.next_t, self.last_state = when, state + + def next_time(self) -> float: + return self.next_t + + def next_state(self) -> S: + return self.last_state + + def pop_state(self): + self.next_t = INFINITY diff --git a/xdevs/plugins/celldevs_outputs/transport.py b/xdevs/plugins/celldevs_outputs/transport.py new file mode 100644 index 0000000..acdc285 --- /dev/null +++ b/xdevs/plugins/celldevs_outputs/transport.py @@ -0,0 +1,27 @@ +from __future__ import annotations +from typing import Generic +from queue import PriorityQueue +from xdevs.abc.celldevs import C, S, DelayedOutput, INFINITY + + +class TransportDelayedOutput(DelayedOutput[C, S], Generic[C, S]): + def __init__(self, cell_id: C, serve: bool = False): + super().__init__(cell_id, serve) + self.last_state: S | None = None + self.schedule: PriorityQueue = PriorityQueue() + self.next_states: dict[float, S] = dict() + + def add_to_buffer(self, when: float, state: S): + if when not in self.next_states: + self.schedule.put(when) + self.next_states[when] = state + + def next_time(self) -> float: + return self.schedule.queue[0] if self.next_states else INFINITY + + def next_state(self) -> S: + return self.next_states[self.schedule.queue[0]] if self.next_states else self.last_state + + def pop_state(self): + if not self.schedule.empty(): + self.last_state = self.next_states.pop(self.schedule.get())