Skip to content

Commit 1117e62

Browse files
committed
Fix: offload sync tool execution to AnyIO worker thread to prevent event-loop blocking
1 parent 69c8a2f commit 1117e62

File tree

2 files changed

+35
-71
lines changed

2 files changed

+35
-71
lines changed

src/mcp/server/fastmcp/utilities/func_metadata.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import inspect
22
import json
33
from collections.abc import Awaitable, Callable, Sequence
4+
from functools import partial
45
from itertools import chain
56
from types import GenericAlias
67
from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints
@@ -16,7 +17,6 @@
1617
WithJsonSchema,
1718
create_model,
1819
)
19-
from functools import partial
2020
from pydantic.fields import FieldInfo
2121
from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind
2222
from typing_extensions import is_typeddict
@@ -95,7 +95,7 @@ async def call_fn_with_arg_validation(
9595
if fn_is_async:
9696
return await fn(**arguments_parsed_dict)
9797
else:
98-
await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict))
98+
return await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict))
9999

100100
def convert_result(self, result: Any) -> Any:
101101
"""

tests/server/fastmcp/test_func_metadata.py

Lines changed: 33 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
# pyright: reportMissingParameterType=false
44
# pyright: reportUnknownArgumentType=false
55
# pyright: reportUnknownLambdaType=false
6-
from collections.abc import Callable
7-
from dataclasses import dataclass
86
import threading
97
import time
8+
from collections.abc import Callable
9+
from dataclasses import dataclass
1010
from typing import Annotated, Any, Final, TypedDict
1111

1212
import annotated_types
@@ -1206,72 +1206,6 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc
12061206
assert meta.output_schema is not None
12071207
assert meta.output_schema["properties"]["result"] == {"exclusiveMinimum": 1, "title": "Result", "type": "integer"}
12081208

1209-
@pytest.mark.anyio
1210-
async def test_sync_function_runs_in_worker_thread():
1211-
"""
1212-
Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync,
1213-
instead of blocking the event loop thread.
1214-
"""
1215-
1216-
def blocking_sync(delay: float) -> int: # pragma: no cover
1217-
# Sleep to simulate a blocking sync tool
1218-
time.sleep(delay)
1219-
# Return the thread ID we are running on
1220-
return threading.get_ident()
1221-
1222-
meta = func_metadata(blocking_sync)
1223-
1224-
# This is the event loop thread ID (where the test itself is running)
1225-
loop_thread_id = threading.get_ident()
1226-
1227-
# Call the sync function through call_fn_with_arg_validation
1228-
result_thread_id = await meta.call_fn_with_arg_validation(
1229-
blocking_sync,
1230-
fn_is_async=False,
1231-
arguments_to_validate={"delay": 0.01},
1232-
arguments_to_pass_directly=None,
1233-
)
1234-
1235-
# The tool should have executed in a different worker thread
1236-
assert result_thread_id != loop_thread_id
1237-
1238-
1239-
@pytest.mark.anyio
1240-
async def test_sync_blocking_tool_does_not_block_event_loop():
1241-
"""
1242-
A blocking synchronous tool (time.sleep) should not prevent other tasks
1243-
on the event loop from running, because it is offloaded to a worker thread.
1244-
"""
1245-
1246-
def blocking_tool(delay: float) -> str: # pragma: no cover
1247-
time.sleep(delay)
1248-
return "done"
1249-
1250-
meta = func_metadata(blocking_tool)
1251-
1252-
flag = {"ran": False}
1253-
1254-
async def run_tool():
1255-
result = await meta.call_fn_with_arg_validation(
1256-
blocking_tool,
1257-
fn_is_async=False,
1258-
arguments_to_validate={"delay": 0.2},
1259-
arguments_to_pass_directly=None,
1260-
)
1261-
assert result == "done"
1262-
1263-
async def concurrent_task():
1264-
# If the event loop is *not* blocked, this will run while the tool sleeps
1265-
await anyio.sleep(0.05)
1266-
flag["ran"] = True
1267-
1268-
async with anyio.create_task_group() as tg:
1269-
tg.start_soon(run_tool)
1270-
tg.start_soon(concurrent_task)
1271-
1272-
# If the sync tool had blocked the event loop, concurrent_task would never
1273-
# have executed and flag["ran"] would still be False.
1274-
assert flag["ran"] is True
12751209

12761210
@pytest.mark.anyio
12771211
async def test_sync_tool_does_not_block_event_loop() -> None:
@@ -1319,4 +1253,34 @@ async def fast_probe() -> None:
13191253
assert fast_probe_elapsed is not None
13201254
# If slow_sync blocks the loop, this will be ~0.30s and fail.
13211255
# If slow_sync is offloaded, this should typically be a few ms.
1322-
assert fast_probe_elapsed < 0.10
1256+
assert fast_probe_elapsed < 0.10
1257+
1258+
1259+
@pytest.mark.anyio
1260+
async def test_sync_function_runs_in_worker_thread():
1261+
"""
1262+
Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync,
1263+
instead of blocking the event loop thread.
1264+
"""
1265+
1266+
def blocking_sync(delay: float) -> int: # pragma: no cover
1267+
# Sleep to simulate a blocking sync tool
1268+
time.sleep(delay)
1269+
# Return the thread ID we are running on
1270+
return threading.get_ident()
1271+
1272+
meta = func_metadata(blocking_sync)
1273+
1274+
# This is the event loop thread ID (where the test itself is running)
1275+
loop_thread_id = threading.get_ident()
1276+
1277+
# Call the sync function through call_fn_with_arg_validation
1278+
result_thread_id = await meta.call_fn_with_arg_validation(
1279+
blocking_sync,
1280+
fn_is_async=False,
1281+
arguments_to_validate={"delay": 0.01},
1282+
arguments_to_pass_directly=None,
1283+
)
1284+
1285+
# The tool should have executed in a different worker thread
1286+
assert result_thread_id != loop_thread_id

0 commit comments

Comments
 (0)