-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the EventProcessor and make print_stat_doubler use it
- Loading branch information
Showing
9 changed files
with
631 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
""" | ||
The idea behind these: | ||
State observers take many events and generate concise actionable ones | ||
- They observe events | ||
- They accumulate state | ||
- Their state is private | ||
- They generate events | ||
This should simplify the global state and move some of the workarounds | ||
into smaller pieces of code. | ||
The state watchers have one thread that processes incoming events. | ||
Included is an output event handling thread to notify the rest of the app | ||
""" | ||
from enum import Enum | ||
from queue import Queue | ||
from threading import Event, Thread | ||
from typing import Any, Callable, Dict, List, Optional, Set, Union | ||
|
||
|
||
class ProcessorEvent: | ||
"""An event object either enqueued for processing or output from the | ||
state watchers""" | ||
|
||
def __init__(self, name, *args, **kwargs): | ||
self.name = name | ||
self.args = args | ||
self.kwargs = kwargs | ||
|
||
|
||
InputHandlerType = Callable[[ProcessorEvent], | ||
Optional[List[ProcessorEvent]]] | ||
RawHandlerType = Callable[[ProcessorEvent], | ||
Union[None, | ||
ProcessorEvent, | ||
List[ProcessorEvent]]] | ||
OutputHandlerType = Callable[[ProcessorEvent], None] | ||
|
||
|
||
class EventInfo: | ||
"""An object to hold the event info and how to register a callback | ||
that would generate such event. Optionally a callback to de-register it | ||
to save on CPU power when the event is not being consumed by anything""" | ||
|
||
def __init__(self, | ||
name, | ||
registration, | ||
deregistration=None): | ||
self.name = name | ||
self.registration = registration | ||
self.deregistration = deregistration | ||
self.registered = False | ||
self.callback = None | ||
|
||
self.users = set() | ||
|
||
def add_watcher(self, user): | ||
"""Add an EventWatcher consumer to the event""" | ||
self.users.add(user) | ||
if len(self.users) == 1 and not self.registered: | ||
self.registration(self._handler) | ||
self.registered = True | ||
|
||
def remove_watcher(self, user): | ||
"""Remove an EventWatcher consumer from the event""" | ||
if user not in self.users: | ||
return | ||
self.users.remove(user) | ||
if len(self.users) == 0 and self.deregistration is not None: | ||
self.deregistration(self._handler) | ||
self.registered = False | ||
|
||
def set_callback(self, callback: InputHandlerType): | ||
"""Set the callback that will be called when the event is generated | ||
This should be called only by the event processor when registering | ||
the event for the first time""" | ||
if self.callback is not None: | ||
raise ValueError("Callback already set") | ||
self.callback = callback | ||
|
||
def _handler(self, *args, **kwargs): | ||
"""Adapts the callback call to generate the event that is then | ||
enqueued for processing""" | ||
self.callback(ProcessorEvent(self.name, *args, **kwargs)) | ||
|
||
|
||
class StateWatcher: | ||
"""State watchers try to decrease the number of events that influence the | ||
global state. This is their root class""" | ||
|
||
class OutputEvent(Enum): | ||
"""Events that are generated by the state watcher""" | ||
|
||
def __init__(self): | ||
# Maps the event names to handlers | ||
self.event_handlers: Dict[Enum, RawHandlerType] = {} | ||
# Gets filled after the watcher is registered with the event processor | ||
# Allows the event watcher to stop watching for selected events | ||
self.tracked_events: Dict[Enum, EventInfo] = {} | ||
|
||
def register(self, tracked_events: Dict[Any, EventInfo]): | ||
"""Watcher has been registered with the event processor, | ||
here is the event info it's been missing""" | ||
self.tracked_events = tracked_events | ||
for event_info in self.tracked_events.values(): | ||
event_info.add_watcher(self) | ||
self._after_register() | ||
|
||
def _after_register(self): | ||
"""Called after the watcher has been registered with the event | ||
processor""" | ||
|
||
def process_event(self, small_event: ProcessorEvent): | ||
"""Process an event, return a list of output events for later | ||
handling""" | ||
handler = self.event_handlers.get(small_event.name) | ||
if handler: | ||
output = handler(*small_event.args, **small_event.kwargs) | ||
if output is None: | ||
return None | ||
if isinstance(output, ProcessorEvent): | ||
return [output] | ||
if isinstance(output, list): | ||
return output | ||
raise ValueError("The state watcher handler should return None, " | ||
"a ProcessorEvent or a list of ProcessorEvents") | ||
return None | ||
|
||
def _get_event_info(self, event_name): | ||
"""Get the event info for the event name, throw an exception if its | ||
not tracked""" | ||
try: | ||
return self.tracked_events[event_name] | ||
except KeyError as error: | ||
raise ValueError(f"Event {event_name} not tracked by this " | ||
f"watcher") from error | ||
|
||
def _stop_watching(self, event_name: Enum): | ||
"""Stop watching for an event""" | ||
self._get_event_info(event_name).remove_watcher(self) | ||
|
||
def _watch(self, event_name: Enum): | ||
"""Start watching for an event""" | ||
self._get_event_info(event_name).add_watcher(self) | ||
|
||
|
||
class EventProcessor: | ||
"""Allows you to register StateWatchers and their handlers | ||
Idea - make the state watchers be able to decide what events to disable | ||
for themselves""" | ||
|
||
def __init__(self): | ||
self.input_queue: Queue[Optional[ProcessorEvent]] = Queue() | ||
self.output_queue: Queue[Optional[ProcessorEvent]] = Queue() | ||
self.quit_evt = Event() | ||
self.tracked_events: Dict[Enum, EventInfo] = {} | ||
self.watcher_output_events: Dict[Enum, StateWatcher] = {} | ||
self.output_handlers: Dict[Enum, OutputHandlerType] = {} | ||
self.watchers: Dict[EventInfo, Set[StateWatcher]] = {} | ||
|
||
self.ingest_thread = Thread(target=self._ingest_events, | ||
name="SPIngest", | ||
daemon=True) | ||
self.output_thread = Thread(target=self._process_output_events, | ||
name="SPOutput", | ||
daemon=True) | ||
self.ingest_thread.start() | ||
self.output_thread.start() | ||
|
||
def track_event(self, event_info): | ||
"""Adds the event to the tracked ones""" | ||
event_info.set_callback(self.input_queue.put) | ||
self.tracked_events[event_info.name] = event_info | ||
|
||
def _get_event_info(self, event_name): | ||
"""Get the event info for the event name, throw an exception if its | ||
not tracked""" | ||
try: | ||
return self.tracked_events[event_name] | ||
except KeyError as error: | ||
raise ValueError(f"Event {event_name} not tracked") from error | ||
|
||
def add_watcher(self, watcher: StateWatcher): | ||
"""Add a watcher to the list of watchers""" | ||
watcher_event_info_dict = {} | ||
for event_name in watcher.event_handlers: | ||
event_info = self._get_event_info(event_name) | ||
self.watchers.setdefault( | ||
event_info, set()).add(watcher) | ||
watcher_event_info_dict[event_name] = event_info | ||
|
||
for output_event in watcher.OutputEvent: | ||
if output_event in self.watcher_output_events: | ||
raise ValueError("Watchers have to have unique output events") | ||
self.watcher_output_events[output_event] = watcher | ||
|
||
watcher.register(watcher_event_info_dict) | ||
|
||
def add_output_event_handler(self, event_name, handler): | ||
"""Add a handler for an output event""" | ||
if event_name in self.output_handlers: | ||
raise ValueError(f"Event {event_name} already has a handler. " | ||
f"If you need multiple handlers, call them " | ||
f"manually in the controller") | ||
self.output_handlers[event_name] = handler | ||
|
||
def _ingest_events(self): | ||
"""Process events in the queue""" | ||
while not self.quit_evt.is_set(): | ||
event = self.input_queue.get() | ||
# Ignore Nones, used for stopping | ||
if event is None: | ||
continue | ||
event_info = self._get_event_info(event.name) | ||
for watcher in self.watchers[event_info]: | ||
output_events = watcher.process_event(event) | ||
if output_events is None: | ||
continue | ||
for output_event in output_events: | ||
self.output_queue.put(output_event) | ||
|
||
def _process_output_events(self): | ||
"""Call handlers of output events""" | ||
while not self.quit_evt.is_set(): | ||
event = self.output_queue.get() | ||
# Ignore Nones, used for stopping | ||
if event is None: | ||
continue | ||
if event.name in self.output_handlers: | ||
handler = self.output_handlers[event.name] | ||
handler(*event.args, **event.kwargs) | ||
|
||
def stop(self): | ||
"""Stop the thread""" | ||
self.quit_evt.set() | ||
# Unblock the queues | ||
self.input_queue.put(None) | ||
self.output_queue.put(None) | ||
|
||
def wait_stopped(self): | ||
"""Wait for the thread to stop""" | ||
self.ingest_thread.join() | ||
self.output_thread.join() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
"""Utilities for the event processor""" | ||
from threading import Event, Thread | ||
from time import monotonic | ||
|
||
from .event_processor import EventInfo | ||
|
||
|
||
class SerialEventInfoFactory: | ||
"""A factory to create EventInfo objects that generate Events from | ||
the serial signals""" | ||
def __init__(self, serial_parser): | ||
self.serial_parser = serial_parser | ||
|
||
def create(self, name, regexp, priority=0): | ||
"""Creates the EventInfo object""" | ||
|
||
def registration(handler): | ||
"""Registers the handler to the serial parser""" | ||
self.serial_parser.add_handler( | ||
regexp=regexp, | ||
handler=handler, | ||
priority=priority) | ||
|
||
def deregistration(handler): | ||
"""De-registers the handler from the serial parser""" | ||
self.serial_parser.remove_handler( | ||
regexp=regexp, | ||
handler=handler) | ||
|
||
return EventInfo( | ||
name=name, | ||
registration=registration, | ||
deregistration=deregistration, | ||
) | ||
|
||
|
||
class Ticker: | ||
"""A class that calls a callback every interval seconds""" | ||
|
||
def __init__(self, interval=0.2): | ||
self.last_tick = monotonic() | ||
self.interval = interval # seconds | ||
self.quit_evt = Event() | ||
self.callback = None | ||
self.thread = Thread(target=self.ticker, name="Ticker", daemon=True) | ||
self.thread.start() | ||
|
||
def ticker(self): | ||
"""Ticks every interval seconds, calls the callback""" | ||
while not self.quit_evt.is_set(): | ||
self.last_tick = monotonic() | ||
if self.callback is not None: | ||
self.callback() | ||
|
||
wait_amount = self.interval - (monotonic() - self.last_tick) | ||
if wait_amount > 0: | ||
self.quit_evt.wait(wait_amount) | ||
|
||
def set_handler(self, handler): | ||
"""Sets the callback""" | ||
self.callback = handler | ||
|
||
def stop(self): | ||
"""Stops the ticker""" | ||
self.quit_evt.set() | ||
|
||
def wait_stopped(self): | ||
"""Waits for the ticker thread to stop""" | ||
self.thread.join() |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.