Skip to content

Commit 714dc21

Browse files
committed
refactor: Store no longer aggregates changes, it now calls listeners with every change
refactor: `SideEffectRunnerThread` now runs async side effects in the event loop of the thread in which it was instantiated in (it used to create its own event loop) refactor(test): `event_loop` fixture now sets the global event loop on setup and restores it on teardown
1 parent 855ab60 commit 714dc21

16 files changed

+102
-93
lines changed

.github/workflows/integration_delivery.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ on:
66
workflow_dispatch:
77

88
env:
9-
PYTHON_VERSION: "3.11"
9+
PYTHON_VERSION: '3.11'
1010

1111
jobs:
1212
dependencies:
@@ -267,7 +267,7 @@ jobs:
267267
path: artifacts
268268

269269
- name: Release
270-
uses: softprops/action-gh-release@v1
270+
uses: softprops/action-gh-release@v2
271271
with:
272272
files: artifacts/*
273273
tag_name: v${{ needs.build.outputs.version }}

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Changelog
22

3+
## Version 0.14.0
4+
5+
- refactor: `Store` no longer aggregates changes, it now calls listeners with every
6+
change
7+
- refactor: `SideEffectRunnerThread` now runs async side effects in the event loop
8+
of the thread in which it was instantiated in (it used to create its own event
9+
loop)
10+
- refactor(test): `event_loop` fixture now sets the global event loop on setup and
11+
restores it on teardown
12+
313
## Version 0.13.2
414

515
- fix: initial snapshot cleanup which used to mistakenly remove files with store:...

poetry.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "python-redux"
3-
version = "0.13.2"
3+
version = "0.14.0"
44
description = "Redux implementation for Python"
55
authors = ["Sassan Haradji <[email protected]>"]
66
license = "Apache-2.0"
@@ -60,6 +60,7 @@ profile = "black"
6060

6161
[tool.pyright]
6262
exclude = ['typings']
63+
filterwarnings = 'error'
6364

6465
[tool.pytest.ini_options]
6566
log_cli = 1

redux/autorun.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,7 @@ def _check_and_call(
171171
create_task = self._store._create_task # noqa: SLF001
172172
if iscoroutine(self._latest_value) and create_task:
173173
create_task(self._latest_value, callback=self._task_callback)
174-
else:
175-
self.inform_subscribers()
174+
self.inform_subscribers()
176175
else:
177176
self.unsubscribe()
178177

redux/main.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def __init__(
5050
options: CreateStoreOptions[Action, Event] | None = None,
5151
) -> None:
5252
"""Create a new store."""
53+
self.finished = False
5354
self.store_options = options or CreateStoreOptions()
5455
self.reducer = reducer
5556
self._create_task = self.store_options.task_creator
@@ -95,35 +96,37 @@ def __init__(
9596
if self.store_options.scheduler:
9697
self.store_options.scheduler(self.run, interval=True)
9798

99+
def _call_listeners(self: Store[State, Action, Event], state: State) -> None:
100+
for listener_ in self._listeners.copy():
101+
if isinstance(listener_, weakref.ref):
102+
listener = listener_()
103+
if listener is None:
104+
self._listeners.discard(listener_)
105+
continue
106+
else:
107+
listener = listener_
108+
result = listener(state)
109+
if asyncio.iscoroutine(result) and self._create_task:
110+
self._create_task(result)
111+
98112
def _run_actions(self: Store[State, Action, Event]) -> None:
99113
action = self._actions.pop(0)
100114
result = self.reducer(self._state, action)
101115
if is_complete_reducer_result(result):
102116
self._state = result.state
117+
self._call_listeners(self._state)
103118
self.dispatch([*(result.actions or []), *(result.events or [])])
104119
elif is_state_reducer_result(result):
105120
self._state = result
121+
self._call_listeners(self._state)
106122

107123
if isinstance(action, FinishAction):
108124
self.dispatch(cast(Event, FinishEvent()))
109125

110-
if len(self._actions) == 0 and self._state:
111-
for listener_ in self._listeners.copy():
112-
if isinstance(listener_, weakref.ref):
113-
listener = listener_()
114-
if listener is None:
115-
self._listeners.discard(listener_)
116-
continue
117-
else:
118-
listener = listener_
119-
result = listener(self._state)
120-
if asyncio.iscoroutine(result) and self._create_task:
121-
self._create_task(result)
122-
123126
def _run_event_handlers(self: Store[State, Action, Event]) -> None:
124127
event = self._events.pop(0)
125128
for event_handler_ in self._event_handlers[type(event)].copy():
126-
self._event_handlers_queue.put((event_handler_, event))
129+
self._event_handlers_queue.put_nowait((event_handler_, event))
127130

128131
def run(self: Store[State, Action, Event]) -> None:
129132
"""Run the store."""
@@ -134,7 +137,12 @@ def run(self: Store[State, Action, Event]) -> None:
134137

135138
if len(self._events) > 0:
136139
self._run_event_handlers()
137-
if not any(worker.is_alive() for worker in self._workers):
140+
if (
141+
self.finished
142+
and self._actions == []
143+
and self._events == []
144+
and not any(worker.is_alive() for worker in self._workers)
145+
):
138146
self.clean_up()
139147

140148
def clean_up(self: Store[State, Action, Event]) -> None:
@@ -219,7 +227,8 @@ def unsubscribe() -> None:
219227

220228
def _handle_finish_event(self: Store[State, Action, Event]) -> None:
221229
for _ in range(self.store_options.threads):
222-
self._event_handlers_queue.put(None)
230+
self._event_handlers_queue.put_nowait(None)
231+
self.finished = True
223232

224233
def autorun(
225234
self: Store[State, Action, Event],

redux/side_effect_runner.py

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import contextlib
77
import threading
88
import weakref
9-
from asyncio import Task, iscoroutine
9+
from asyncio import Handle, iscoroutine
1010
from inspect import signature
1111
from typing import TYPE_CHECKING, Any, Callable, Generic, cast
1212

@@ -27,16 +27,14 @@ def __init__(
2727
"""Initialize the side effect runner thread."""
2828
super().__init__()
2929
self.task_queue = task_queue
30-
self._tasks: set[Task] = set()
30+
self.loop = asyncio.get_event_loop()
31+
self._handles: set[Handle] = set()
32+
self.create_task = lambda coro: self._handles.add(
33+
self.loop.call_soon_threadsafe(self.loop.create_task, coro),
34+
)
3135

3236
def run(self: SideEffectRunnerThread[Event]) -> None:
3337
"""Run the side effect runner thread."""
34-
self.loop = asyncio.new_event_loop()
35-
self.create_task = lambda coro: self._tasks.add(self.loop.create_task(coro))
36-
self.loop.run_until_complete(self.work())
37-
38-
async def work(self: SideEffectRunnerThread[Event]) -> None:
39-
"""Run the side effects."""
4038
while True:
4139
task = self.task_queue.get()
4240
if task is None:
@@ -61,17 +59,3 @@ async def work(self: SideEffectRunnerThread[Event]) -> None:
6159
self.create_task(result)
6260
finally:
6361
self.task_queue.task_done()
64-
await self.clean_up()
65-
66-
async def clean_up(self: SideEffectRunnerThread[Event]) -> None:
67-
"""Clean up the side effect runner thread."""
68-
while True:
69-
tasks = [
70-
task
71-
for task in asyncio.all_tasks(self.loop)
72-
if task is not asyncio.current_task(self.loop)
73-
]
74-
if not tasks:
75-
break
76-
for task in tasks:
77-
await task

redux_pytest/fixtures/event_loop.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ class LoopThread(threading.Thread):
1212
def __init__(self: LoopThread) -> None:
1313
super().__init__()
1414
self.loop = asyncio.new_event_loop()
15+
asyncio.set_event_loop(self.loop)
1516

1617
def run(self: LoopThread) -> None:
1718
self.loop.run_forever()
1819

1920
def stop(self: LoopThread) -> None:
21+
asyncio.set_event_loop(None)
2022
self.loop.call_soon_threadsafe(self.loop.stop)
2123

2224
def create_task(self: LoopThread, coro: Coroutine) -> None:
23-
self.loop.call_soon_threadsafe(lambda: self.loop.create_task(coro))
25+
self.loop.call_soon_threadsafe(self.loop.create_task, coro)
2426

2527

2628
@pytest.fixture()

redux_pytest/fixtures/snapshot.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ def __init__(
5151
def json_snapshot(self: StoreSnapshot) -> str:
5252
"""Return the snapshot of the current state of the store."""
5353
return (
54-
json.dumps(self.store.snapshot, indent=2, sort_keys=True)
54+
json.dumps(
55+
self.store.snapshot,
56+
indent=2,
57+
sort_keys=True,
58+
ensure_ascii=False,
59+
)
5560
if self.store._state # noqa: SLF001
5661
else ''
5762
)

tests/results/test_features/general/store-subscription-002.jsonc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
{
33
"_id": "e3e70682c2094cac629f6fbed82c07cd",
44
"base10": {
5-
"count": 12
5+
"count": 11
66
},
77
"inverse": {
8-
"count": -1
8+
"count": 0
99
},
1010
"straight": {
11-
"count": 2
11+
"count": 1
1212
}
1313
}

0 commit comments

Comments
 (0)