-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
2f81b63
commit 20f9695
Showing
23 changed files
with
1,033 additions
and
324 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
from .celldevs import DelayedOutput | ||
from .handler import InputHandler, OutputHandler | ||
from .transducer import Transducer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Generic | ||
from xdevs import INFINITY | ||
from xdevs.models import Port | ||
from xdevs.celldevs import C, S | ||
|
||
|
||
class DelayedOutput(Generic[C, S], ABC): | ||
def __init__(self, cell_id: C, serve: bool = False): | ||
""" | ||
Cell-DEVS delayed output port. This is an abstract base class. | ||
:param cell_id: ID of the cell that owns this delayed output. | ||
:param serve: set to True if the port is going to be accessible via RPC server. Defaults to False. | ||
""" | ||
from xdevs.celldevs.inout import CellMessage | ||
self.cell_id = cell_id | ||
self.port: Port[CellMessage[C, S]] = Port(CellMessage, 'out_celldevs', serve) | ||
|
||
@abstractmethod | ||
def add_to_buffer(self, when: float, state: S): | ||
""" | ||
Schedules a cell state to send events. | ||
:param when: time at which the events must be sent. | ||
:param state: cell state. Events will be obtained by mapping this state. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def next_time(self) -> float: | ||
""":return: next time at which events must be sent.""" | ||
pass | ||
|
||
@abstractmethod | ||
def next_state(self) -> S: | ||
""":return: next cell state used to generate events.""" | ||
pass | ||
|
||
@abstractmethod | ||
def pop_state(self): | ||
"""removes schedule state from the delayed output.""" | ||
pass | ||
|
||
def send_events(self, time: float): | ||
""" | ||
If there is an scheduled state, it sends a new event via every Cell-DEVS output port. | ||
:param time: current simulation time. | ||
""" | ||
from xdevs.celldevs.inout import CellMessage | ||
if self.next_time() <= time: | ||
self.port.add(CellMessage(self.cell_id, self.next_state())) | ||
|
||
def clean(self, time: float): | ||
""" | ||
It cleans all the outdated scheduled cell states. | ||
:param time: current simulation time. | ||
""" | ||
while self.next_time() < INFINITY and self.next_time() <= time: | ||
self.pop_state() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from __future__ import annotations | ||
from typing import TypeVar | ||
|
||
C = TypeVar('C') # Variable type used for cell IDs | ||
S = TypeVar('S') # Variable type used for cell states | ||
V = TypeVar('V') # Variable type used for cell vicinities |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
from __future__ import annotations | ||
from abc import ABC, abstractmethod | ||
from copy import deepcopy | ||
from typing import Any, Generic | ||
from xdevs.celldevs import C, S, V | ||
from xdevs.celldevs.inout import CellMessage, InPort | ||
from xdevs.models import Atomic | ||
from xdevs.factory import DelayedOutputs, DelayedOutput | ||
|
||
|
||
class CellConfig(Generic[C, S, V]): | ||
def __init__(self, config_id: str, c_type: type[C], s_type: type[S], v_type: type[V], **kwargs): | ||
""" | ||
Cell-DEVS configuration structure. | ||
:param config_id: identifier of the configuration. | ||
:param c_type: type used to identify cells. | ||
:param s_type: type used to represent cell states. | ||
:param v_type: type used to represent vicinity between cells. | ||
:param cell_type: identifier of the cell type. | ||
:param delay: identifier of the delay buffer implemented by the cell. By default, it is set to inertial. | ||
:param config: any additional configuration parameters. | ||
:param state: parameters required to create the initial state of the cell. | ||
:param neighborhood: representation of the cell neighborhood. By default, it is empty. | ||
:param cell_map: list of cells that implement this configuration. By default, it is empty. | ||
:param eic: list of external input couplings. By default, it is empty. | ||
:param eoc: list of external output couplings. By default, it is empty. | ||
""" | ||
self.config_id: str = config_id | ||
self.c_type: type[C] = c_type | ||
self.s_type: type[S] = s_type | ||
self.v_type: type[V] = v_type | ||
CellMessage.state_t = s_type | ||
|
||
self.cell_type: str = kwargs['cell_type'] | ||
self.delay_type: str = kwargs.get('delay', 'inertial') | ||
self.cell_config = kwargs.get('config') | ||
self.state = kwargs.get('state') | ||
self.raw_neighborhood: list[dict] = kwargs.get('neighborhood', list()) | ||
self.cell_map: list[C] | None = None if self.default else self._load_map(*kwargs.get('cell_map', list())) | ||
self.eic: list[tuple[str, str]] = self._parse_couplings(kwargs.get('eic', list())) | ||
self.ic: list[tuple[str, str]] = [('out_celldevs', 'in_celldevs')] | ||
self.eoc: list[tuple[str, str]] = self._parse_couplings(kwargs.get('eoc', list())) | ||
|
||
@property | ||
def default(self) -> bool: | ||
""":return: true if this configuration profile is the default one.""" | ||
return self.config_id == 'default' | ||
|
||
def apply_patch(self, config_id: str, **kwargs): | ||
""" | ||
Applies a configuration patch. This method is used for non-default configurations. | ||
:param config_id: configuration ID. | ||
:param cell_type: identifier of the cell type. | ||
:param delay: identifier of the delay buffer implemented by the cell. By default, it is set to inertial. | ||
:param config: any additional configuration parameters. | ||
:param state: parameters required to create the initial state of the cell. | ||
:param neighborhood: representation of the cell neighborhood. By default, it is empty. | ||
:param cell_map: list of cells that implement this configuration. By default, it is empty. | ||
:param eic: list of external input couplings. By default, it is empty. | ||
:param ic: list of internal couplings. By default, it is empty. # TODO remove this? | ||
:param eoc: list of external output couplings. By default, it is empty. | ||
""" | ||
self.config_id = config_id | ||
self.cell_type = kwargs.get('cell_type', self.cell_type) | ||
self.delay_type = kwargs.get('delay', self.delay_type) | ||
if 'config' in kwargs: | ||
self.cell_config = self._patch_dict(self.cell_config, kwargs['config']) \ | ||
if isinstance(self.cell_config, dict) else kwargs['config'] | ||
if 'state' in kwargs: | ||
self.state = self._patch_dict(self.state, kwargs['state']) \ | ||
if isinstance(self.state, dict) else kwargs['state'] | ||
self.raw_neighborhood = kwargs.get('neighborhood', self.raw_neighborhood) | ||
if 'cell_map' in kwargs: | ||
self.cell_map = self._load_map(*kwargs['cell_map']) | ||
if 'eic' in kwargs: | ||
self.eic = self._parse_couplings(kwargs['eic']) | ||
if 'ic' in kwargs: | ||
self.ic = self._parse_couplings(kwargs['ic']) | ||
if 'eoc' in kwargs: | ||
self.eoc = self._parse_couplings(kwargs['eoc']) | ||
|
||
def load_state(self) -> S: | ||
""":return: a new initial state structure.""" | ||
return self._load_value(self.s_type, self.state) | ||
|
||
def load_neighborhood(self) -> dict[C, V]: | ||
""":return: a new neighborhood.""" | ||
neighbors: dict[C, V] = dict() | ||
for neighborhood in self.raw_neighborhood: | ||
for neighbor, vicinity in neighborhood.items(): | ||
neighbors[self.c_type(neighbor)] = self._load_vicinity(vicinity) | ||
return neighbors | ||
|
||
def _load_map(self, *args) -> list[C]: | ||
return [self.c_type(self.config_id)] | ||
|
||
def _load_vicinity(self, vicinity: Any): | ||
return self._load_value(self.v_type, vicinity) | ||
|
||
@classmethod | ||
def _patch_dict(cls, d: dict, patch: dict) -> dict: | ||
for k, v in patch.items(): | ||
d[k] = cls._patch_dict(d[k], v) if isinstance(v, dict) and k in d and isinstance(d[k], dict) else v | ||
return d | ||
|
||
@staticmethod | ||
def _parse_couplings(couplings: list[list[str]]) -> list[tuple[str, str]]: | ||
return [(coupling[0], coupling[1]) for coupling in couplings] | ||
|
||
@staticmethod | ||
def _load_value(t_type, params: Any): | ||
params = deepcopy(params) | ||
if isinstance(params, dict): | ||
return t_type(**params) | ||
elif isinstance(params, list): | ||
return t_type(*params) | ||
elif params is not None: | ||
return t_type(params) | ||
return t_type() | ||
|
||
|
||
class Cell(Atomic, ABC, Generic[C, S, V]): | ||
def __init__(self, cell_id: C, config: CellConfig[C, S, V]): | ||
""" | ||
Abstract Base Class for a Cell-DEVS cell. | ||
:param cell_id: cell identifier. | ||
:param config: cell configuration structure. | ||
""" | ||
super().__init__(str(cell_id)) | ||
self._clock: float = 0 | ||
self._config: CellConfig = config | ||
self.ics = config.eic | ||
self.cell_id: C = cell_id | ||
self.cell_state: S = config.load_state() | ||
self.neighborhood: dict[C, V] = self._load_neighborhood() | ||
|
||
self.in_celldevs: InPort[C, S] = InPort(self.cell_id) | ||
self.out_celldevs: DelayedOutput[C, S] = DelayedOutputs.create_delayed_output(config.delay_type, self.cell_id) | ||
self.add_in_port(self.in_celldevs.port) | ||
self.add_out_port(self.out_celldevs.port) | ||
|
||
@property | ||
def neighbors_state(self) -> dict[C, S]: | ||
return self.in_celldevs.history | ||
|
||
@abstractmethod | ||
def local_computation(self, cell_state: S) -> S: | ||
""" | ||
Computes new cell state depending on its previous state. | ||
:param cell_state: current cell state. | ||
:return: new cell state. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def output_delay(self, cell_state: S) -> float: | ||
""" | ||
Returns delay to be applied to output messages related to new cell state. | ||
:param cell_state: new cell state. | ||
:return: delay to be applied. | ||
""" | ||
pass | ||
|
||
def deltint(self): | ||
self._clock += self.sigma | ||
self.out_celldevs.clean(self._clock) | ||
self.sigma = self.out_celldevs.next_time() - self._clock | ||
|
||
def deltext(self, e: float): | ||
self._clock += e | ||
self.sigma -= e | ||
self.in_celldevs.read_new_events() | ||
|
||
new_state = self.local_computation(deepcopy(self.cell_state)) | ||
if new_state != self.cell_state: | ||
state = deepcopy(new_state) | ||
self.out_celldevs.add_to_buffer(self._clock + self.output_delay(state), state) | ||
self.sigma = self.out_celldevs.next_time() - self._clock | ||
self.cell_state = new_state | ||
|
||
def lambdaf(self): | ||
self.out_celldevs.send_events(self._clock + self.sigma) | ||
|
||
def initialize(self): | ||
self.out_celldevs.add_to_buffer(0, self.cell_state) | ||
self.activate() | ||
|
||
def exit(self): | ||
pass | ||
|
||
def _load_neighborhood(self) -> dict[C, V]: | ||
return self._config.load_neighborhood() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
from __future__ import annotations | ||
import json | ||
from abc import abstractmethod, ABC | ||
from copy import deepcopy | ||
from typing import Dict, Generic, Optional, Tuple, Type | ||
from xdevs.celldevs import C, S, V | ||
from xdevs.celldevs.cell import Cell, CellConfig | ||
from xdevs.celldevs.grid import GridCell, GridCellConfig, GridScenario | ||
from xdevs.models import Coupled | ||
|
||
|
||
class CoupledCellDEVS(Coupled, ABC, Generic[C, S, V]): | ||
def __init__(self, c_type: Type[C], s_type: Type[S], v_type: Type[V], config_file: str, name: Optional[str] = None): | ||
super().__init__(name) | ||
self.c_type: Type[C] = c_type | ||
self.s_type: Type[S] = s_type | ||
self.v_type: Type[V] = v_type | ||
with open(config_file) as file: | ||
self.raw_config = json.load(file) | ||
self._configs: Dict[str, CellConfig[C, S, V]] = dict() | ||
self._cells: Dict[C, Tuple[Cell[C, S, V], CellConfig]] = dict() | ||
|
||
def load_config(self): | ||
raw_configs = self.raw_config['cells'] | ||
default_config: CellConfig[C, S, V] = self._load_default_config(raw_configs['default']) | ||
self._configs = {'default': default_config} | ||
for config_id, raw_config in raw_configs.items(): | ||
if config_id != 'default': | ||
config = deepcopy(default_config) | ||
config.apply_patch(config_id, **raw_config) | ||
self._configs[config_id] = config | ||
|
||
def load_cells(self): | ||
for cell_config in self._configs.values(): | ||
if not cell_config.default: | ||
for cell_id in cell_config.cell_map: | ||
if cell_id in self._cells: | ||
raise ValueError('cell with the same ID already exists') | ||
cell: Cell[C, S, V] = self.create_cell(cell_config.cell_type, cell_id, cell_config) | ||
self._cells[cell_id] = (cell, cell_config) | ||
self.add_component(cell) | ||
|
||
def load_couplings(self): | ||
for cell_to, cell_config in self._cells.values(): | ||
for port_from, port_to in cell_config.eic: | ||
self.add_coupling(self.get_in_port(port_from), cell_to.get_in_port(port_to)) | ||
for neighbor in cell_to.neighborhood: | ||
cell_from = self._cells[neighbor][0] | ||
for port_from, port_to in cell_config.ic: | ||
self.add_coupling(cell_from.get_out_port(port_from), cell_to.get_in_port(port_to)) | ||
for port_from, port_to in cell_config.eoc: | ||
self.add_coupling(cell_to.get_out_port(port_from), self.get_out_port(port_to)) | ||
|
||
def _load_default_config(self, raw_config: Dict) -> CellConfig[C, S, V]: | ||
return CellConfig('default', self.c_type, self.s_type, self.v_type, **raw_config) | ||
|
||
@abstractmethod | ||
def create_cell(self, cell_type: str, cell_id: C, cell_config: CellConfig[C, S, V]) -> Cell[C, S, V]: | ||
pass | ||
|
||
|
||
class CoupledGridCellDEVS(CoupledCellDEVS[Tuple[int, ...], S, V], ABC, Generic[S, V]): | ||
|
||
_configs: Dict[str, GridCellConfig] | ||
|
||
def __init__(self, s_type: Type[S], v_type: Type[V], config_file: str): | ||
super().__init__(tuple, s_type, v_type, config_file) | ||
|
||
scenario_config = self.raw_config['scenario'] | ||
shape = tuple(scenario_config['shape']) | ||
origin = tuple(scenario_config['origin']) if 'origin' in scenario_config else None | ||
wrapped = scenario_config.get('wrapped', False) | ||
self.scenario: GridScenario = GridScenario(shape, origin, wrapped) | ||
|
||
def load_cells(self): | ||
super().load_cells() | ||
default_config = self._configs['default'] | ||
for cell_id in self.scenario.iter_cells(): | ||
if cell_id not in self._cells: | ||
cell: GridCell[S, V] = self.create_cell(default_config.cell_type, cell_id, default_config) | ||
self._cells[cell_id] = (cell, default_config) | ||
self.add_component(cell) | ||
|
||
def _load_default_config(self, raw_config: Dict) -> GridCellConfig[S, V]: | ||
return GridCellConfig(self.scenario, 'default', self.s_type, self.v_type, **raw_config) | ||
|
||
@abstractmethod | ||
def create_cell(self, cell_type: str, cell_id: Tuple[int, ...], | ||
cell_config: GridCellConfig[S, V]) -> GridCell[S, V]: | ||
pass |
Oops, something went wrong.