Skip to content

Commit b1887fb

Browse files
refactor(event_handler): extract async logic
1 parent 8673ede commit b1887fb

File tree

2 files changed

+109
-66
lines changed

2 files changed

+109
-66
lines changed

aws_lambda_powertools/event_handler/http_resolver.py

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
from __future__ import annotations
22

3-
import asyncio
43
import base64
54
import inspect
6-
import threading
75
import warnings
86
from typing import TYPE_CHECKING, Any, Callable
97
from urllib.parse import parse_qs
@@ -15,6 +13,7 @@
1513
Response,
1614
Route,
1715
)
16+
from aws_lambda_powertools.event_handler.middlewares.async_utils import wrap_middleware_async
1817
from aws_lambda_powertools.shared.headers_serializer import BaseHeadersSerializer
1918
from aws_lambda_powertools.utilities.data_classes.common import BaseProxyEvent
2019

@@ -320,73 +319,10 @@ async def final_handler(app):
320319
next_handler = final_handler
321320

322321
for middleware in reversed(all_middlewares):
323-
next_handler = self._wrap_middleware_async(middleware, next_handler)
322+
next_handler = wrap_middleware_async(middleware, next_handler)
324323

325324
return await next_handler(self)
326325

327-
def _wrap_middleware_async(self, middleware: Callable, next_handler: Callable) -> Callable:
328-
"""Wrap a middleware to work in async context.
329-
330-
For sync middlewares, we split execution into pre/post phases around the
331-
call to next(). The sync middleware runs its pre-processing (e.g. request
332-
validation), then we intercept the next() call, await the async handler,
333-
and resume the middleware with the real response so post-processing
334-
(e.g. response validation) sees the actual data.
335-
"""
336-
337-
async def wrapped(app):
338-
if inspect.iscoroutinefunction(middleware):
339-
return await middleware(app, next_handler)
340-
341-
# We use an Event to coordinate: the sync middleware runs in a thread,
342-
# calls sync_next which signals us to resolve the async handler,
343-
# then waits for the real response.
344-
middleware_called_next = asyncio.Event()
345-
next_app_holder: list = []
346-
real_response_holder: list = []
347-
middleware_result_holder: list = []
348-
middleware_error_holder: list = []
349-
350-
def sync_next(app):
351-
next_app_holder.append(app)
352-
middleware_called_next.set()
353-
# Block this thread until the real response is available
354-
event = threading.Event()
355-
next_app_holder.append(event)
356-
event.wait()
357-
return real_response_holder[0]
358-
359-
def run_middleware():
360-
try:
361-
result = middleware(app, sync_next)
362-
middleware_result_holder.append(result)
363-
except Exception as e:
364-
middleware_error_holder.append(e)
365-
366-
thread = threading.Thread(target=run_middleware, daemon=True)
367-
thread.start()
368-
369-
# Wait for the middleware to call next()
370-
await middleware_called_next.wait()
371-
372-
# Now resolve the async next_handler
373-
real_response = await next_handler(next_app_holder[0])
374-
real_response_holder.append(real_response)
375-
376-
# Signal the thread that the response is ready
377-
threading_event = next_app_holder[1]
378-
threading_event.set()
379-
380-
# Wait for the middleware thread to finish
381-
thread.join()
382-
383-
if middleware_error_holder:
384-
raise middleware_error_holder[0]
385-
386-
return middleware_result_holder[0]
387-
388-
return wrapped
389-
390326
async def _handle_not_found_async(self) -> dict:
391327
"""Handle 404 responses, using custom not_found handler if registered."""
392328
from http import HTTPStatus
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""Async middleware utilities for bridging sync and async middleware execution."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import inspect
7+
import threading
8+
from typing import TYPE_CHECKING, Any
9+
10+
if TYPE_CHECKING:
11+
from collections.abc import Callable
12+
13+
from aws_lambda_powertools.event_handler.api_gateway import ApiGatewayResolver, Response
14+
15+
16+
def wrap_middleware_async(middleware: Callable, next_handler: Callable) -> Callable:
17+
"""Wrap a middleware to work in an async context.
18+
19+
For async middlewares, delegates directly with ``await``.
20+
21+
For sync middlewares, runs the middleware in a background thread and uses
22+
``asyncio.Event`` / ``threading.Event`` to coordinate the ``next()`` call
23+
so the async handler can be awaited on the main event-loop while the sync
24+
middleware blocks its own thread waiting for the result.
25+
26+
Parameters
27+
----------
28+
middleware : Callable
29+
A sync or async middleware ``(app, next_middleware) -> Response``.
30+
next_handler : Callable
31+
The next (async) handler in the chain.
32+
33+
Returns
34+
-------
35+
Callable
36+
An async callable ``(app) -> Response`` that executes *middleware*
37+
followed by *next_handler*.
38+
"""
39+
40+
async def wrapped(app: ApiGatewayResolver) -> Response:
41+
if inspect.iscoroutinefunction(middleware):
42+
return await middleware(app, next_handler)
43+
44+
return await _run_sync_middleware_in_thread(middleware, next_handler, app)
45+
46+
return wrapped
47+
48+
49+
async def _run_sync_middleware_in_thread(
50+
middleware: Callable,
51+
next_handler: Callable,
52+
app: Any,
53+
) -> Any:
54+
"""Execute a **sync** middleware inside a daemon thread.
55+
56+
The sync middleware calls ``sync_next(app)`` which:
57+
58+
1. Signals the async side that the middleware is ready for the next handler.
59+
2. Blocks the thread until the async handler has produced a response.
60+
3. Returns the response so the middleware can do post-processing.
61+
62+
Meanwhile the async side awaits *next_handler*, feeds the response back,
63+
and waits for the thread to finish.
64+
"""
65+
middleware_called_next = asyncio.Event()
66+
next_app_holder: list = []
67+
real_response_holder: list = []
68+
middleware_result_holder: list = []
69+
middleware_error_holder: list = []
70+
71+
def sync_next(app: Any) -> Any:
72+
next_app_holder.append(app)
73+
middleware_called_next.set()
74+
# Block this thread until the async handler resolves
75+
event = threading.Event()
76+
next_app_holder.append(event)
77+
event.wait()
78+
return real_response_holder[0]
79+
80+
def run_middleware() -> None:
81+
try:
82+
result = middleware(app, sync_next)
83+
middleware_result_holder.append(result)
84+
except Exception as e:
85+
middleware_error_holder.append(e)
86+
87+
thread = threading.Thread(target=run_middleware, daemon=True)
88+
thread.start()
89+
90+
# Wait for the middleware to call next()
91+
await middleware_called_next.wait()
92+
93+
# Resolve the async next_handler on the event-loop
94+
real_response = await next_handler(next_app_holder[0])
95+
real_response_holder.append(real_response)
96+
97+
# Unblock the middleware thread
98+
threading_event = next_app_holder[1]
99+
threading_event.set()
100+
101+
# Wait for the middleware thread to complete post-processing
102+
thread.join()
103+
104+
if middleware_error_holder:
105+
raise middleware_error_holder[0]
106+
107+
return middleware_result_holder[0]

0 commit comments

Comments
 (0)