Skip to content

Commit 4137df5

Browse files
committed
added callables as events
1 parent eee6c8b commit 4137df5

File tree

7 files changed

+171
-82
lines changed

7 files changed

+171
-82
lines changed

docs/apis.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ Micro-Manager Devices
5252
:members:
5353

5454

55-
Implemented Events
55+
.. _standard_events:
56+
57+
Standard Events
5658
==================
5759

5860
Detector Events

docs/usage/events.rst

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,50 @@
44
Events
55
======
66

7+
Events in ExEngine are the fundamental units of experimental workflows. They represent discrete tasks or operations that can be submitted to the ExecutionEngine for execution. Events provide a flexible and modular way to construct complex experimental workflows, ranging from simple hardware commands to sophisticated multi-step procedures that may involve data analysis.
78

8-
Events in ExEngine are the fundamental units of experimental workflows. They represent discrete tasks or operations that can be submitted to the ExecutionEngine for processing. Events provide a flexible and modular way to construct complex experimental workflows, ranging from simple hardware commands to sophisticated multi-step procedures that may involve data analysis.
9+
ExEngine supports two types of events:
910

10-
The work of an event is fully contained in its execute method. This method can be called directly to run the event on the current thread:
11+
- ``Callable`` objects (methods/functions/lambdas) for simple tasks
12+
- ``ExecutorEvent`` subclasses for complex operations
1113

1214

15+
Simple Events: Callable Objects
16+
-------------------------------
17+
For straightforward tasks, you can submit a callable object directly:
18+
1319
.. code-block:: python
1420
15-
from exengine.events.positioner_events import SetPosition2DEvent
21+
def simple_task():
22+
do_something()
23+
24+
engine.submit(simple_task)
1625
17-
# Create an event
18-
move_event = SetPosition2DEvent(device=xy_stage, position=(10.0, 20.0))
1926
20-
# Execute the event directly on the current thread
21-
move_event.execute()
2227
23-
More commonly, events are submitted to the execution engine to be executed asynchronously:
28+
ExecutorEvent Objects
29+
----------------------
30+
31+
For more complex operations, use ExecutorEvent subclasses. These provide additional capabilities like notifications and data handling:
2432

2533
.. code-block:: python
2634
27-
from exengine import ExecutionEngine
35+
from exengine.events.positioner_events import SetPosition2DEvent
36+
37+
move_event = SetPosition2DEvent(device=xy_stage, position=(10.0, 20.0))
38+
future = engine.submit(move_event)
39+
2840
29-
engine = ExecutionEngine.get_instance()
3041
31-
# Submit the event to the execution engine
42+
.. code-block:: python
43+
44+
# Asynchronous execution
3245
future = engine.submit(move_event)
3346
# This returns immediately, allowing other operations to continue
3447
48+
The power of this approach lies in its ability to separate the definition of what takes place from the details of how it is executed. While the event defines the operation to be performed, the execution engine manages the scheduling and execution of events across multiple threads. This separation allows for complex workflows to be built up from simple, reusable components, while the execution engine manages the details of scheduling execution, and error handling.
3549

36-
37-
The power of this approach lies in its ability to separate the definition of what takes place from the details of how it is executed. While the event defines the operation to be performed, the execution engine manages the scheduling and execution of events across multiple threads. This separation allows for complex workflows to be built up from simple, reusable components, while the execution engine manages the details of scheduling and resource allocation.
50+
A list of available events can be found in the :ref:`standard_events` section.
3851

3952
Monitoring Event Progress
4053
--------------------------

src/exengine/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
version_info = (0, 1, 1)
1+
version_info = (0, 2, 0)
22
__version__ = ".".join(map(str, version_info))

src/exengine/kernel/ex_event_base.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import warnings
2-
from typing import Optional, Any,ClassVar, Type, List, Dict, Union, Iterable
2+
from typing import Optional, Any,ClassVar, Type, List, Dict, Union, Iterable, Callable
33
from abc import ABC, abstractmethod, ABCMeta
44
import weakref
5-
from .notification_base import Notification
5+
import inspect
6+
from functools import partial
67

8+
from .notification_base import Notification
79
from .notification_base import EventExecutedNotification
8-
9-
# if TYPE_CHECKING: # avoid circular imports
1010
from .ex_future import ExecutionFuture
1111

1212

@@ -103,8 +103,30 @@ def _post_execution(self, return_value: Optional[Any] = None, exception: Optiona
103103
if self._future_weakref is None:
104104
raise Exception("Future not set for event")
105105
future = self._future_weakref()
106-
if future is not None:
107-
future._notify_execution_complete(return_value, exception)
108106
self.finished = True
109107
self._engine.publish_notification(EventExecutedNotification(payload=exception))
108+
if future is not None:
109+
future._notify_execution_complete(return_value, exception)
110110

111+
112+
113+
class AnonymousCallableEvent(ExecutorEvent):
114+
"""
115+
An event that wraps a callable object and calls it when the event is executed.
116+
117+
The callable object should take no arguments and optionally return a value.
118+
"""
119+
def __init__(self, callable_obj: Callable[[], Any]):
120+
super().__init__()
121+
self.callable_obj = callable_obj
122+
# Check if the callable has no parameters (except for 'self' in case of methods)
123+
if not callable(callable_obj):
124+
raise TypeError("Callable object must be a function or method")
125+
signature = inspect.signature(callable_obj)
126+
if not all(param.default != param.empty or param.kind == param.VAR_POSITIONAL for param in
127+
signature.parameters.values()):
128+
raise TypeError("Callable object must take no arguments")
129+
130+
131+
def execute(self):
132+
return self.callable_obj()

src/exengine/kernel/ex_future.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,22 @@ def await_data(self, coordinates: Optional[Union[DataCoordinates, Dict[str, Unio
120120
this function is called before the data is acquired, the data may have already been saved and not readily
121121
available in RAM. In this case, the data will be read from disk.
122122
123-
124-
:param coordinates: A single DataCoordinates object/dictionary, or Sequence (i.e. list or tuple) of
125-
DataCoordinates objects/dictionaries. If None, this function will block until the next data is
126-
acquired/processed/saved
127-
:param return_data: whether to return the data
128-
:param return_metadata: whether to return the metadata
129-
:param processed: whether to wait until data has been processed. If not data processor is in use, then this
130-
parameter has no effect
131-
:param stored: whether to wait for data that has been stored. If the call to await data occurs before the data
132-
gets passed off to the storage_backends class, then it will be stored in memory and returned immediately without having to retrieve
123+
Parameters:
124+
------------
125+
coordinates: Union[DataCoordinates, Dict[str, Union[int, str]], DataCoordinatesIterator, Sequence[DataCoordinates], Sequence[Dict[str, Union[int, str]]]
126+
A single DataCoordinates object/dictionary, or Sequence (i.e. list or tuple) of
127+
DataCoordinates objects/dictionaries. If None, this function will block until the next data is
128+
acquired/processed/saved
129+
return_data: bool
130+
whether to return the data
131+
return_metadata: bool
132+
whether to return the metadata
133+
processed: bool
134+
whether to wait until data has been processed. If not data processor is in use, then this
135+
parameter has no effect
136+
stored: bool
137+
whether to wait for data that has been stored. If the call to await data occurs before the data
138+
gets passed off to the storage_backends class, then it will be stored in memory and returned immediately without having to retrieve
133139
"""
134140

135141
coordinates_iterator = DataCoordinatesIterator.create(coordinates)

src/exengine/kernel/executor.py

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
import traceback
99
from typing import Union, Iterable, List, Callable, Any, Type
1010
import queue
11+
import inspect
1112

1213
from .notification_base import Notification, NotificationCategory
13-
from .ex_event_base import ExecutorEvent
14+
from .ex_event_base import ExecutorEvent, AnonymousCallableEvent
1415
from .ex_future import ExecutionFuture
1516

1617
from .data_handler import DataHandler
@@ -178,32 +179,23 @@ def check_exceptions(self):
178179
raise MultipleExceptions(exceptions)
179180

180181
def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], thread_name=None,
181-
transpile: bool = True, prioritize: bool = False, use_free_thread: bool = False,
182-
data_handler: DataHandler = None) -> Union[ExecutionFuture, Iterable[ExecutionFuture]]:
182+
prioritize: bool = False, use_free_thread: bool = False) -> Union[ExecutionFuture, Iterable[ExecutionFuture]]:
183183
"""
184-
Submit one or more acquisition events for execution.
184+
Submit one or more acquisition events or callable objects for execution.
185185
186-
This method handles the submission of acquisition events to be executed on active threads. It provides
187-
options for event prioritization, thread allocation, and performance optimization.
186+
This method handles the submission of acquisition events or callable objects to be executed on active threads.
187+
It provides options for event prioritization, thread allocation, and performance optimization.
188188
189-
Execution Behavior:
190-
- By default, all events are executed on a single thread in submission order to prevent concurrency issues.
191-
- Events can be parallelized across different threads using the 'use_free_thread' parameter.
192-
- Priority execution can be requested using the 'prioritize' parameter.
193189
194190
Parameters:
195191
-----------
196-
event_or_events : Union[ExecutorEvent, Iterable[ExecutorEvent]]
197-
A single ExecutorEvent or an iterable of ExecutorEvents to be submitted.
192+
event_or_events : Union[ExecutorEvent, Iterable[ExecutorEvent], Callable[[], Any], Iterable[Callable[[], Any]]]
193+
A single ExecutorEvent, an iterable of ExecutorEvents, or a callable object with no arguments.
198194
199195
thread_name : str, optional (default=None)
200196
Name of the thread to submit the event to. If None, the thread is determined by the
201197
'use_free_thread' parameter.
202198
203-
transpile : bool, optional (default=True)
204-
If True and multiple events are submitted, attempt to optimize them for better performance.
205-
This may result in events being combined or reorganized.
206-
207199
prioritize : bool, optional (default=False)
208200
If True, execute the event(s) before any others in the queue on its assigned thread.
209201
Useful for system-wide changes affecting other events, like hardware adjustments.
@@ -213,32 +205,37 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]],
213205
Useful for operations like cancelling or stopping events awaiting signals.
214206
If False, execute on the primary thread.
215207
216-
data_handler : DataHandler, optional (default=None)
217-
Object to handle data and metadata produced by DataProducingExecutorEvents.
218-
219208
Returns:
220209
--------
221210
Union[AcquisitionFuture, Iterable[AcquisitionFuture]]
222-
For a single event: returns a single AcquisitionFuture.
223-
For multiple events: returns an Iterable of AcquisitionFutures.
224-
Note: The number of returned futures may differ from the input if transpilation occurs.
211+
For a single event or callable: returns a single ExecutionFuture.
212+
For multiple events: returns an Iterable of ExecutionFutures.
225213
226214
Notes:
227215
------
228-
- Transpilation may optimize multiple events, potentially altering their number or structure.
229216
- Use 'prioritize' for critical system changes that should occur before other queued events.
230217
- 'use_free_thread' is essential for operations that need to run independently, like cancellation events.
218+
- If a callable object with no arguments is submitted, it will be automatically wrapped in a AnonymousCallableEvent.
231219
"""
232-
if isinstance(event_or_events, ExecutorEvent):
220+
# Auto convert single callable to event
221+
if callable(event_or_events) and len(inspect.signature(event_or_events).parameters) == 0:
222+
event_or_events = AnonymousCallableEvent(event_or_events)
223+
224+
if isinstance(event_or_events, (ExecutorEvent, Callable)):
233225
event_or_events = [event_or_events]
234226

235-
if transpile:
236-
# TODO: transpile events
237-
pass
227+
events = []
228+
for event in event_or_events:
229+
if callable(event):
230+
events.append(AnonymousCallableEvent(event))
231+
elif isinstance(event, ExecutorEvent):
232+
events.append(event)
233+
else:
234+
raise TypeError(f"Invalid event type: {type(event)}. "
235+
f"Expected ExecutorEvent or callable with no arguments.")
238236

239-
futures = tuple(self._submit_single_event(event, thread_name or getattr(event_or_events[0], '_thread_name', None),
240-
use_free_thread, prioritize)
241-
for event in event_or_events)
237+
futures = tuple(self._submit_single_event(event, thread_name or getattr(event, '_thread_name', None),
238+
use_free_thread, prioritize) for event in events)
242239
if len(futures) == 1:
243240
return futures[0]
244241
return futures

src/exengine/kernel/test/test_executor.py

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -222,28 +222,28 @@ def test_device_threadpool_executor(execution_engine):
222222
#######################################################
223223
# Tests for other ExecutionEngine functionalities
224224
#######################################################
225+
226+
class SyncEvent(ExecutorEvent):
227+
228+
def __init__(self, start_event, finish_event):
229+
super().__init__()
230+
self.executed = False
231+
self.executed_time = None
232+
self.execute_count = 0
233+
self.executed_thread_name = None
234+
self.start_event = start_event
235+
self.finish_event = finish_event
236+
237+
def execute(self):
238+
self.executed_thread_name = threading.current_thread().name
239+
self.start_event.set() # Signal that the execution has started
240+
self.finish_event.wait() # Wait for the signal to finish
241+
self.executed_time = time.time()
242+
self.execute_count += 1
243+
self.executed = True
244+
225245
def create_sync_event(start_event, finish_event):
226-
event = MagicMock(spec=ExecutorEvent)
227-
event._finished = False
228-
event._initialized = False
229-
event.num_retries_on_exception = 0
230-
event.executed = False
231-
event.executed_time = None
232-
event.execute_count = 0
233-
event.executed_thread_name = None
234-
event._thread_name = None
235-
236-
def execute():
237-
event.executed_thread_name = threading.current_thread().name
238-
start_event.set() # Signal that the execution has started
239-
finish_event.wait() # Wait for the signal to finish
240-
event.executed_time = time.time()
241-
event.execute_count += 1
242-
event.executed = True
243-
244-
event.execute.side_effect = execute
245-
event._post_execution = MagicMock()
246-
return event
246+
return SyncEvent(start_event, finish_event)
247247

248248

249249
def test_submit_single_event(execution_engine):
@@ -397,6 +397,55 @@ def test_single_execution_with_free_thread(execution_engine):
397397
assert event1.execute_count == 1
398398
assert event2.execute_count == 1
399399

400+
#### Callable submission tests ####
401+
def test_submit_callable(execution_engine):
402+
def simple_function():
403+
return 42
404+
405+
future = execution_engine.submit(simple_function)
406+
result = future.await_execution()
407+
assert result == 42
408+
409+
def test_submit_lambda(execution_engine):
410+
future = execution_engine.submit(lambda: "Hello, World!")
411+
result = future.await_execution()
412+
assert result == "Hello, World!"
413+
414+
def test_class_method(execution_engine):
415+
class TestClass:
416+
def test_method(self):
417+
return "Test method executed"
418+
419+
future = execution_engine.submit(TestClass().test_method)
420+
result = future.await_execution()
421+
assert result == "Test method executed"
422+
423+
def test_submit_mixed(execution_engine):
424+
class TestEvent(ExecutorEvent):
425+
def execute(self):
426+
return "Event executed"
427+
428+
futures = execution_engine.submit([TestEvent(), lambda: 42, lambda: "Lambda"])
429+
results = [future.await_execution() for future in futures]
430+
assert results == ["Event executed", 42, "Lambda"]
431+
432+
def test_submit_invalid(execution_engine):
433+
with pytest.raises(TypeError):
434+
execution_engine.submit(lambda x: x + 1) # Callable with arguments should raise TypeError
435+
436+
with pytest.raises(TypeError):
437+
execution_engine.submit("Not a callable") # Non-callable, non-ExecutorEvent should raise TypeError
438+
439+
def test_callable_event_notification(execution_engine):
440+
notifications = []
441+
execution_engine.subscribe_to_notifications(lambda n: notifications.append(n))
442+
443+
future = execution_engine.submit(lambda: "Test")
444+
result = future.await_execution()
445+
446+
assert result == "Test"
447+
assert len(notifications) == 1
448+
assert notifications[0].__class__.__name__ == "EventExecutedNotification"
400449

401450
#######################################################
402451
# Tests for named thread functionalities ##############

0 commit comments

Comments
 (0)