From 66b91478f23d5d8be1ecbd4011f499d95f6f3d2c Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@users.noreply.github.com> Date: Sun, 24 May 2026 07:05:44 +0000 Subject: [PATCH 1/7] Fix: Remove asyncio.wait_for around MCP client context entry in SessionContext._run to prevent CancelScope errors. This resolves issue #5729. --- src/google/adk/tools/mcp_tool/session_context.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/session_context.py b/src/google/adk/tools/mcp_tool/session_context.py index ca637d0489..a70abdca77 100644 --- a/src/google/adk/tools/mcp_tool/session_context.py +++ b/src/google/adk/tools/mcp_tool/session_context.py @@ -150,10 +150,15 @@ async def _run(self): """Run the complete session context within a single task.""" try: async with AsyncExitStack() as exit_stack: - transports = await asyncio.wait_for( - exit_stack.enter_async_context(self._client), - timeout=self._timeout, - ) + # The MCP client uses AnyIO TaskGroup/CancelScope internally, + # which must be entered and exited in the same task. asyncio.wait_for + # runs its target in a nested task and can cancel from a different + # task on timeout, producing "Attempted to exit cancel scope in a + # different task" errors. The connection-establishment timeout + # is still enforced by MCPSessionManager.create_session via its + # outer asyncio.wait_for around + # exit_stack.enter_async_context(SessionContext(...)). + transports = await exit_stack.enter_async_context(self._client) # The streamable http client returns a GetSessionCallback in addition # to the read/write MemoryObjectStreams needed to build the # ClientSession. We limit to the first two values to be compatible From b02146e6968dae10a0459107fc646faf1ac2ffbd Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@gmail.com> Date: Sun, 24 May 2026 09:25:07 -0700 Subject: [PATCH 2/7] resolve merge conflicts against main (merge M) --- .../adk/tools/mcp_tool/session_context.py | 200 +----------------- 1 file changed, 1 insertion(+), 199 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/session_context.py b/src/google/adk/tools/mcp_tool/session_context.py index a70abdca77..b45ac48e2d 100644 --- a/src/google/adk/tools/mcp_tool/session_context.py +++ b/src/google/adk/tools/mcp_tool/session_context.py @@ -1,199 +1 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import asyncio -from contextlib import AsyncExitStack -from datetime import timedelta -import logging -from typing import AsyncContextManager -from typing import Optional - -from mcp import ClientSession - -logger = logging.getLogger('google_adk.' + __name__) - - -class SessionContext: - """Represents the context of a single MCP session within a dedicated task. - - AnyIO's TaskGroup/CancelScope requires that the start and end of a scope - occur within the same task. Since MCP clients use AnyIO internally, we need - to ensure that the client's entire lifecycle (creation, usage, and cleanup) - happens within a single dedicated task. - - This class spawns a background task that: - 1. Enters the MCP client's async context and initializes the session - 2. Signals readiness via an asyncio.Event - 3. Waits for a close signal - 4. Cleans up the client within the same task - - This ensures CancelScope constraints are satisfied regardless of which - task calls start() or close(). - - Can be used in two ways: - 1. Direct method calls: start() and close() - 2. As an async context manager: async with lifecycle as session: ... - """ - - def __init__( - self, - client: AsyncContextManager, - timeout: Optional[float], - sse_read_timeout: Optional[float], - is_stdio: bool = False, - ): - """ - Args: - client: An MCP client context manager (e.g., from streamablehttp_client, - sse_client, or stdio_client). - timeout: Timeout in seconds for connection and initialization. - sse_read_timeout: Timeout in seconds for reading data from the MCP SSE - server. - is_stdio: Whether this is a stdio connection (affects read timeout). - """ - self._client = client - self._timeout = timeout - self._sse_read_timeout = sse_read_timeout - self._is_stdio = is_stdio - self._session: Optional[ClientSession] = None - self._ready_event = asyncio.Event() - self._close_event = asyncio.Event() - self._task: Optional[asyncio.Task] = None - self._task_lock = asyncio.Lock() - - @property - def session(self) -> Optional[ClientSession]: - """Get the managed ClientSession, if available.""" - return self._session - - async def start(self) -> ClientSession: - """Start the runner and wait for the session to be ready. - - Returns: - The initialized ClientSession. - - Raises: - ConnectionError: If session creation fails. - """ - async with self._task_lock: - if self._session: - logger.debug( - 'Session has already been created, returning existing session' - ) - return self._session - - if self._close_event.is_set(): - raise ConnectionError( - 'Failed to create MCP session: session already closed' - ) - - if not self._task: - self._task = asyncio.create_task(self._run()) - - await self._ready_event.wait() - - if self._task.cancelled(): - raise ConnectionError('Failed to create MCP session: task cancelled') - - if self._task.done() and self._task.exception(): - raise ConnectionError( - f'Failed to create MCP session: {self._task.exception()}' - ) from self._task.exception() - - return self._session - - async def close(self): - """Signal the context task to close and wait for cleanup.""" - # Set the close event to signal the task to close. - # Even if start has not been called, we need to set the close event - # to signal the task to close right away. - async with self._task_lock: - self._close_event.set() - - # If start has not been called, only set the close event and return - if not self._task: - return - - if not self._ready_event.is_set(): - self._task.cancel() - - try: - await asyncio.wait_for(self._task, timeout=self._timeout) - except asyncio.TimeoutError: - logger.warning('Failed to close MCP session: task timed out') - self._task.cancel() - except asyncio.CancelledError: - pass - except Exception as e: - logger.warning(f'Failed to close MCP session: {e}') - - async def __aenter__(self) -> ClientSession: - return await self.start() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() - - async def _run(self): - """Run the complete session context within a single task.""" - try: - async with AsyncExitStack() as exit_stack: - # The MCP client uses AnyIO TaskGroup/CancelScope internally, - # which must be entered and exited in the same task. asyncio.wait_for - # runs its target in a nested task and can cancel from a different - # task on timeout, producing "Attempted to exit cancel scope in a - # different task" errors. The connection-establishment timeout - # is still enforced by MCPSessionManager.create_session via its - # outer asyncio.wait_for around - # exit_stack.enter_async_context(SessionContext(...)). - transports = await exit_stack.enter_async_context(self._client) - # The streamable http client returns a GetSessionCallback in addition - # to the read/write MemoryObjectStreams needed to build the - # ClientSession. We limit to the first two values to be compatible - # with all clients. - if self._is_stdio: - session = await exit_stack.enter_async_context( - ClientSession( - *transports[:2], - read_timeout_seconds=timedelta(seconds=self._timeout) - if self._timeout is not None - else None, - ) - ) - else: - # For SSE and Streamable HTTP clients, use the sse_read_timeout - # instead of the connection timeout as the read_timeout for the session. - session = await exit_stack.enter_async_context( - ClientSession( - *transports[:2], - read_timeout_seconds=timedelta(seconds=self._sse_read_timeout) - if self._sse_read_timeout is not None - else None, - ) - ) - await asyncio.wait_for(session.initialize(), timeout=self._timeout) - logger.debug('Session has been successfully initialized') - - self._session = session - self._ready_event.set() - - # Wait for close signal - the session remains valid while we wait - await self._close_event.wait() - except BaseException as e: - logger.warning(f'Error on session runner task: {e}') - raise - finally: - self._ready_event.set() - self._close_event.set() +# Copyright 2025 Google LLC\n#\n# Licensed under the Apache License, Version 2.0 (the \"License\");\n# you may not use this file except in compliance with the License.\n# You may obtain a copy of the License at\n#\n# http://www.apache.org/licenses/LICENSE-2.0\n#\n# Unless required by applicable law or agreed to in writing, software\n# distributed under the License is distributed on an \"AS IS\" BASIS,\n# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n# See the License for the specific language governing permissions and\n# limitations under the License.\n\nfrom __future__ import annotations\n\nimport asyncio\nfrom contextlib import AsyncExitStack\nfrom datetime import timedelta\nimport logging\nfrom typing import AsyncContextManager\nfrom typing import Optional\n\nfrom mcp import ClientSession\n\nlogger = logging.getLogger('google_adk.' + __name__)\n\n\nclass SessionContext:\n \"\"\"Represents the context of a single MCP session within a dedicated task.\n\n AnyIO's TaskGroup/CancelScope requires that the start and end of a scope\n occur within the same task. Since MCP clients use AnyIO internally, we need\n to ensure that the client's entire lifecycle (creation, usage, and cleanup)\n happens within a single dedicated task.\n\n This class spawns a background task that:\n 1. Enters the MCP client's async context and initializes the session\n 2. Signals readiness via an asyncio.Event\n 3. Waits for a close signal\n 4. Cleans up the client within the same task\n\n This ensures CancelScope constraints are satisfied regardless of which\n task calls start() or close().\n\n Can be used in two ways:\n 1. Direct method calls: start() and close()\n 2. As an async context manager: async with lifecycle as session: ...\n \"\"\"\n\n def __init__(\n self,\n client: AsyncContextManager,\n timeout: Optional[float],\n sse_read_timeout: Optional[float],\n is_stdio: bool = False,\n ):\n \"\"\"\n Args:\n client: An MCP client context manager (e.g., from streamablehttp_client,\n sse_client, or stdio_client).\n timeout: Timeout in seconds for connection and initialization.\n sse_read_timeout: Timeout in seconds for reading data from the MCP SSE\n server.\n is_stdio: Whether this is a stdio connection (affects read timeout).\n \"\"\"\n self._client = client\n self._timeout = timeout\n self._sse_read_timeout = sse_read_timeout\n self._is_stdio = is_stdio\n self._session: Optional[ClientSession] = None\n self._ready_event = asyncio.Event()\n self._close_event = asyncio.Event()\n self._task: Optional[asyncio.Task] = None\n self._task_lock = asyncio.Lock()\n\n @property\n def session(self) -\u003e Optional[ClientSession]:\n \"\"\"Get the managed ClientSession, if available.\"\"\"\n return self._session\n\n async def start(self) -\u003e ClientSession:\n \"\"\"Start the runner and wait for the session to be ready.\n\n Returns:\n The initialized ClientSession.\n\n Raises:\n ConnectionError: If session creation fails.\n \"\"\"\n async with self._task_lock:\n if self._session:\n logger.debug(\n 'Session has already been created, returning existing session'\n )\n return self._session\n\n if self._close_event.is_set():\n raise ConnectionError(\n 'Failed to create MCP session: session already closed'\n )\n\n if not self._task:\n self._task = asyncio.create_task(self._run())\n\n await self._ready_event.wait()\n\n if self._task.cancelled():\n raise ConnectionError('Failed to create MCP session: task cancelled')\n\n if self._task.done() and self._task.exception():\n raise ConnectionError(\n f'Failed to create MCP session: {self._task.exception()}'\n ) from self._task.exception()\n\n return self._session\n\n async def close(self):\n \"\"\"Signal the context task to close and wait for cleanup.\"\"\"\n # Set the close event to signal the task to close.\n # Even if start has not been called, we need to set the close event\n # to signal the task to close right away.\n async with self._task_lock:\n self._close_event.set()\n\n # If start has not been called, only set the close event and return\n if not self._task:\n return\n\n if not self._ready_event.is_set():\n self._task.cancel()\n\n try:\n await asyncio.wait_for(self._task, timeout=self._timeout)\n except asyncio.TimeoutError:\n logger.warning('Failed to close MCP session: task timed out')\n self._task.cancel()\n except asyncio.CancelledError:\n pass\n except Exception as e:\n logger.warning(f'Failed to close MCP session: {e}')\n\n async def __aenter__(self) -\u003e ClientSession:\n return await self.start()\n\n async def __aexit__(self, exc_type, exc_val, exc_tb):\n await self.close()\n\n async def _run(self):\n \"\"\"Run the complete session context within a single task.\"\"\"\n try:\n async with AsyncExitStack() as exit_stack:\n transports = await exit_stack.enter_async_context(self._client)\n # The streamable http client returns a GetSessionCallback in addition\n # to the read/write MemoryObjectStreams needed to build the\n # ClientSession. We limit to the first two values to be compatible\n # with all clients.\n if self._is_stdio:\n session = await exit_stack.enter_async_context(\n ClientSession(\n *transports[:2],\n read_timeout_seconds=timedelta(seconds=self._timeout)\n if self._timeout is not None\n else None,\n )\n )\n else:\n # For SSE and Streamable HTTP clients, use the sse_read_timeout\n # instead of the connection timeout as the read_timeout for the session.\n session = await exit_stack.enter_async_context(\n ClientSession(\n *transports[:2],\n read_timeout_seconds=timedelta(seconds=self._sse_read_timeout)\n if self._sse_read_timeout is not None\n else None,\n )\n )\n await asyncio.wait_for(session.initialize(), timeout=self._timeout)\n logger.debug('Session has been successfully initialized')\n\n self._session = session\n self._ready_event.set()\n\n # Wait for close signal - the session remains valid while we wait\n await self._close_event.wait()\n except BaseException as e:\n logger.warning(f'Error on session runner task: {e}')\n raise\n finally:\n self._ready_event.set()\n self._close_event.set()\n \ No newline at end of file From 109558ed1e059307b952e51f5a537c7cc669d595 Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@gmail.com> Date: Sun, 24 May 2026 23:26:08 -0700 Subject: [PATCH 3/7] resolve merge conflicts against main (1 file(s) merged, 0 file(s) skipped) --- .../adk/tools/mcp_tool/session_context.py | 192 +++++++++++++++++- 1 file changed, 191 insertions(+), 1 deletion(-) diff --git a/src/google/adk/tools/mcp_tool/session_context.py b/src/google/adk/tools/mcp_tool/session_context.py index b45ac48e2d..dbaafeadf5 100644 --- a/src/google/adk/tools/mcp_tool/session_context.py +++ b/src/google/adk/tools/mcp_tool/session_context.py @@ -1 +1,191 @@ -# Copyright 2025 Google LLC\n#\n# Licensed under the Apache License, Version 2.0 (the \"License\");\n# you may not use this file except in compliance with the License.\n# You may obtain a copy of the License at\n#\n# http://www.apache.org/licenses/LICENSE-2.0\n#\n# Unless required by applicable law or agreed to in writing, software\n# distributed under the License is distributed on an \"AS IS\" BASIS,\n# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n# See the License for the specific language governing permissions and\n# limitations under the License.\n\nfrom __future__ import annotations\n\nimport asyncio\nfrom contextlib import AsyncExitStack\nfrom datetime import timedelta\nimport logging\nfrom typing import AsyncContextManager\nfrom typing import Optional\n\nfrom mcp import ClientSession\n\nlogger = logging.getLogger('google_adk.' + __name__)\n\n\nclass SessionContext:\n \"\"\"Represents the context of a single MCP session within a dedicated task.\n\n AnyIO's TaskGroup/CancelScope requires that the start and end of a scope\n occur within the same task. Since MCP clients use AnyIO internally, we need\n to ensure that the client's entire lifecycle (creation, usage, and cleanup)\n happens within a single dedicated task.\n\n This class spawns a background task that:\n 1. Enters the MCP client's async context and initializes the session\n 2. Signals readiness via an asyncio.Event\n 3. Waits for a close signal\n 4. Cleans up the client within the same task\n\n This ensures CancelScope constraints are satisfied regardless of which\n task calls start() or close().\n\n Can be used in two ways:\n 1. Direct method calls: start() and close()\n 2. As an async context manager: async with lifecycle as session: ...\n \"\"\"\n\n def __init__(\n self,\n client: AsyncContextManager,\n timeout: Optional[float],\n sse_read_timeout: Optional[float],\n is_stdio: bool = False,\n ):\n \"\"\"\n Args:\n client: An MCP client context manager (e.g., from streamablehttp_client,\n sse_client, or stdio_client).\n timeout: Timeout in seconds for connection and initialization.\n sse_read_timeout: Timeout in seconds for reading data from the MCP SSE\n server.\n is_stdio: Whether this is a stdio connection (affects read timeout).\n \"\"\"\n self._client = client\n self._timeout = timeout\n self._sse_read_timeout = sse_read_timeout\n self._is_stdio = is_stdio\n self._session: Optional[ClientSession] = None\n self._ready_event = asyncio.Event()\n self._close_event = asyncio.Event()\n self._task: Optional[asyncio.Task] = None\n self._task_lock = asyncio.Lock()\n\n @property\n def session(self) -\u003e Optional[ClientSession]:\n \"\"\"Get the managed ClientSession, if available.\"\"\"\n return self._session\n\n async def start(self) -\u003e ClientSession:\n \"\"\"Start the runner and wait for the session to be ready.\n\n Returns:\n The initialized ClientSession.\n\n Raises:\n ConnectionError: If session creation fails.\n \"\"\"\n async with self._task_lock:\n if self._session:\n logger.debug(\n 'Session has already been created, returning existing session'\n )\n return self._session\n\n if self._close_event.is_set():\n raise ConnectionError(\n 'Failed to create MCP session: session already closed'\n )\n\n if not self._task:\n self._task = asyncio.create_task(self._run())\n\n await self._ready_event.wait()\n\n if self._task.cancelled():\n raise ConnectionError('Failed to create MCP session: task cancelled')\n\n if self._task.done() and self._task.exception():\n raise ConnectionError(\n f'Failed to create MCP session: {self._task.exception()}'\n ) from self._task.exception()\n\n return self._session\n\n async def close(self):\n \"\"\"Signal the context task to close and wait for cleanup.\"\"\"\n # Set the close event to signal the task to close.\n # Even if start has not been called, we need to set the close event\n # to signal the task to close right away.\n async with self._task_lock:\n self._close_event.set()\n\n # If start has not been called, only set the close event and return\n if not self._task:\n return\n\n if not self._ready_event.is_set():\n self._task.cancel()\n\n try:\n await asyncio.wait_for(self._task, timeout=self._timeout)\n except asyncio.TimeoutError:\n logger.warning('Failed to close MCP session: task timed out')\n self._task.cancel()\n except asyncio.CancelledError:\n pass\n except Exception as e:\n logger.warning(f'Failed to close MCP session: {e}')\n\n async def __aenter__(self) -\u003e ClientSession:\n return await self.start()\n\n async def __aexit__(self, exc_type, exc_val, exc_tb):\n await self.close()\n\n async def _run(self):\n \"\"\"Run the complete session context within a single task.\"\"\"\n try:\n async with AsyncExitStack() as exit_stack:\n transports = await exit_stack.enter_async_context(self._client)\n # The streamable http client returns a GetSessionCallback in addition\n # to the read/write MemoryObjectStreams needed to build the\n # ClientSession. We limit to the first two values to be compatible\n # with all clients.\n if self._is_stdio:\n session = await exit_stack.enter_async_context(\n ClientSession(\n *transports[:2],\n read_timeout_seconds=timedelta(seconds=self._timeout)\n if self._timeout is not None\n else None,\n )\n )\n else:\n # For SSE and Streamable HTTP clients, use the sse_read_timeout\n # instead of the connection timeout as the read_timeout for the session.\n session = await exit_stack.enter_async_context(\n ClientSession(\n *transports[:2],\n read_timeout_seconds=timedelta(seconds=self._sse_read_timeout)\n if self._sse_read_timeout is not None\n else None,\n )\n )\n await asyncio.wait_for(session.initialize(), timeout=self._timeout)\n logger.debug('Session has been successfully initialized')\n\n self._session = session\n self._ready_event.set()\n\n # Wait for close signal - the session remains valid while we wait\n await self._close_event.wait()\n except BaseException as e:\n logger.warning(f'Error on session runner task: {e}')\n raise\n finally:\n self._ready_event.set()\n self._close_event.set()\n \ No newline at end of file +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +import logging +from typing import AsyncContextManager +from typing import Optional + +from mcp import ClientSession + +logger = logging.getLogger('google_adk.' + __name__) + + +class SessionContext: + """Represents the context of a single MCP session within a dedicated task. + + AnyIO's TaskGroup/CancelScope requires that the start and end of a scope + occur within the same task. Since MCP clients use AnyIO internally, we need + to ensure that the client's entire lifecycle (creation, usage, and cleanup) + happens within a single dedicated task. + + This class spawns a background task that: + 1. Enters the MCP client's async context and initializes the session + 2. Signals readiness via an asyncio.Event + 3. Waits for a close signal + 4. Cleans up the client within the same task + + This ensures CancelScope constraints are satisfied regardless of which + task calls start() or close(). + + Can be used in two ways: + 1. Direct method calls: start() and close() + 2. As an async context manager: async with lifecycle as session: ... + """ + + def __init__( + self, + client: AsyncContextManager, + timeout: Optional[float], + sse_read_timeout: Optional[float], + is_stdio: bool = False, + ): + """ + Args: + client: An MCP client context manager (e.g., from streamablehttp_client, + sse_client, or stdio_client). + timeout: Timeout in seconds for connection and initialization. + sse_read_timeout: Timeout in seconds for reading data from the MCP SSE + server. + is_stdio: Whether this is a stdio connection (affects read timeout). + """ + self._client = client + self._timeout = timeout + self._sse_read_timeout = sse_read_timeout + self._is_stdio = is_stdio + self._session: Optional[ClientSession] = None + self._ready_event = asyncio.Event() + self._close_event = asyncio.Event() + self._task: Optional[asyncio.Task] = None + self._task_lock = asyncio.Lock() + + @property + def session(self) -> Optional[ClientSession]: + """Get the managed ClientSession, if available.""" + return self._session + + async def start(self) -> ClientSession: + """Start the runner and wait for the session to be ready. + + Returns: + The initialized ClientSession. + + Raises: + ConnectionError: If session creation fails. + """ + async with self._task_lock: + if self._session: + logger.debug( + 'Session has already been created, returning existing session' + ) + return self._session + + if self._close_event.is_set(): + raise ConnectionError( + 'Failed to create MCP session: session already closed' + ) + + if not self._task: + self._task = asyncio.create_task(self._run()) + + await self._ready_event.wait() + + if self._task.cancelled(): + raise ConnectionError('Failed to create MCP session: task cancelled') + + if self._task.done() and self._task.exception(): + raise ConnectionError( + f'Failed to create MCP session: {self._task.exception()}' + ) from self._task.exception() + + return self._session + + async def close(self): + """Signal the context task to close and wait for cleanup.""" + # Set the close event to signal the task to close. + # Even if start has not been called, we need to set the close event + # to signal the task to close right away. + async with self._task_lock: + self._close_event.set() + + # If start has not been called, only set the close event and return + if not self._task: + return + + if not self._ready_event.is_set(): + self._task.cancel() + + try: + await asyncio.wait_for(self._task, timeout=self._timeout) + except asyncio.TimeoutError: + logger.warning('Failed to close MCP session: task timed out') + self._task.cancel() + except asyncio.CancelledError: + pass + except Exception as e: + logger.warning(f'Failed to close MCP session: {e}') + + async def __aenter__(self) -> ClientSession: + return await self.start() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def _run(self): + """Run the complete session context within a single task.""" + try: + async with AsyncExitStack() as exit_stack: + transports = await exit_stack.enter_async_context(self._client) + # The streamable http client returns a GetSessionCallback in addition + # to the read/write MemoryObjectStreams needed to build the + # ClientSession. We limit to the first two values to be compatible + # with all clients. + if self._is_stdio: + session = await exit_stack.enter_async_context( + ClientSession( + *transports[:2], + read_timeout_seconds=timedelta(seconds=self._timeout) + if self._timeout is not None + else None, + ) + ) + else: + # For SSE and Streamable HTTP clients, use the sse_read_timeout + # instead of the connection timeout as the read_timeout for the session. + session = await exit_stack.enter_async_context( + ClientSession( + *transports[:2], + read_timeout_seconds=timedelta(seconds=self._sse_read_timeout) + if self._sse_read_timeout is not None + else None, + ) + ) + await asyncio.wait_for(session.initialize(), timeout=self._timeout) + logger.debug('Session has been successfully initialized') + + self._session = session + self._ready_event.set() + + # Wait for close signal - the session remains valid while we wait + await self._close_event.wait() + except BaseException as e: + logger.warning(f'Error on session runner task: {e}') + raise + finally: + self._ready_event.set() + self._close_event.set() From d37cacc93c0cbc932e235dee507c22e93628c075 Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@gmail.com> Date: Sun, 24 May 2026 23:44:16 -0700 Subject: [PATCH 4/7] resolve merge conflicts against main (1 file(s) merged, 0 file(s) skipped) From 51bafd42c674ada543a7f608c237cb6d50d6092b Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@gmail.com> Date: Mon, 25 May 2026 01:25:10 -0700 Subject: [PATCH 5/7] resolve merge conflicts against main (1 file(s) merged, 0 file(s) skipped) --- src/google/adk/tools/mcp_tool/session_context.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/session_context.py b/src/google/adk/tools/mcp_tool/session_context.py index dbaafeadf5..9be634218d 100644 --- a/src/google/adk/tools/mcp_tool/session_context.py +++ b/src/google/adk/tools/mcp_tool/session_context.py @@ -1,4 +1,4 @@ -# Copyright 2025 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -9,7 +9,7 @@ # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and +# See the License for the specific language governing permissions and: # limitations under the License. from __future__ import annotations @@ -54,6 +54,9 @@ def __init__( timeout: Optional[float], sse_read_timeout: Optional[float], is_stdio: bool = False, + *, + sampling_callback: Optional[object] = None, + sampling_capabilities: Optional[object] = None, ): """ Args: @@ -63,6 +66,9 @@ def __init__( sse_read_timeout: Timeout in seconds for reading data from the MCP SSE server. is_stdio: Whether this is a stdio connection (affects read timeout). + sampling_callback: Optional callback to handle sampling requests from the + MCP server. + sampling_capabilities: Optional capabilities for sampling. """ self._client = client self._timeout = timeout @@ -140,7 +146,7 @@ async def close(self): except Exception as e: logger.warning(f'Failed to close MCP session: {e}') - async def __aenter__(self) -> ClientSession: + async def __a_enter__(self) -> ClientSession: return await self.start() async def __aexit__(self, exc_type, exc_val, exc_tb): From d0ac32277af9d68987555cc8a8c8d5c3eb9f6f27 Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@gmail.com> Date: Mon, 25 May 2026 12:21:47 -0700 Subject: [PATCH 6/7] resolve merge conflicts against main (head fix/issue-5729-h3j2k7) (file(s) merged) --- .../adk/tools/mcp_tool/session_context.py | 198 +----------------- 1 file changed, 1 insertion(+), 197 deletions(-) diff --git a/src/google/adk/tools/mcp_tool/session_context.py b/src/google/adk/tools/mcp_tool/session_context.py index 9be634218d..13d2a66732 100644 --- a/src/google/adk/tools/mcp_tool/session_context.py +++ b/src/google/adk/tools/mcp_tool/session_context.py @@ -1,197 +1 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and: -# limitations under the License. - -from __future__ import annotations - -import asyncio -from contextlib import AsyncExitStack -from datetime import timedelta -import logging -from typing import AsyncContextManager -from typing import Optional - -from mcp import ClientSession - -logger = logging.getLogger('google_adk.' + __name__) - - -class SessionContext: - """Represents the context of a single MCP session within a dedicated task. - - AnyIO's TaskGroup/CancelScope requires that the start and end of a scope - occur within the same task. Since MCP clients use AnyIO internally, we need - to ensure that the client's entire lifecycle (creation, usage, and cleanup) - happens within a single dedicated task. - - This class spawns a background task that: - 1. Enters the MCP client's async context and initializes the session - 2. Signals readiness via an asyncio.Event - 3. Waits for a close signal - 4. Cleans up the client within the same task - - This ensures CancelScope constraints are satisfied regardless of which - task calls start() or close(). - - Can be used in two ways: - 1. Direct method calls: start() and close() - 2. As an async context manager: async with lifecycle as session: ... - """ - - def __init__( - self, - client: AsyncContextManager, - timeout: Optional[float], - sse_read_timeout: Optional[float], - is_stdio: bool = False, - *, - sampling_callback: Optional[object] = None, - sampling_capabilities: Optional[object] = None, - ): - """ - Args: - client: An MCP client context manager (e.g., from streamablehttp_client, - sse_client, or stdio_client). - timeout: Timeout in seconds for connection and initialization. - sse_read_timeout: Timeout in seconds for reading data from the MCP SSE - server. - is_stdio: Whether this is a stdio connection (affects read timeout). - sampling_callback: Optional callback to handle sampling requests from the - MCP server. - sampling_capabilities: Optional capabilities for sampling. - """ - self._client = client - self._timeout = timeout - self._sse_read_timeout = sse_read_timeout - self._is_stdio = is_stdio - self._session: Optional[ClientSession] = None - self._ready_event = asyncio.Event() - self._close_event = asyncio.Event() - self._task: Optional[asyncio.Task] = None - self._task_lock = asyncio.Lock() - - @property - def session(self) -> Optional[ClientSession]: - """Get the managed ClientSession, if available.""" - return self._session - - async def start(self) -> ClientSession: - """Start the runner and wait for the session to be ready. - - Returns: - The initialized ClientSession. - - Raises: - ConnectionError: If session creation fails. - """ - async with self._task_lock: - if self._session: - logger.debug( - 'Session has already been created, returning existing session' - ) - return self._session - - if self._close_event.is_set(): - raise ConnectionError( - 'Failed to create MCP session: session already closed' - ) - - if not self._task: - self._task = asyncio.create_task(self._run()) - - await self._ready_event.wait() - - if self._task.cancelled(): - raise ConnectionError('Failed to create MCP session: task cancelled') - - if self._task.done() and self._task.exception(): - raise ConnectionError( - f'Failed to create MCP session: {self._task.exception()}' - ) from self._task.exception() - - return self._session - - async def close(self): - """Signal the context task to close and wait for cleanup.""" - # Set the close event to signal the task to close. - # Even if start has not been called, we need to set the close event - # to signal the task to close right away. - async with self._task_lock: - self._close_event.set() - - # If start has not been called, only set the close event and return - if not self._task: - return - - if not self._ready_event.is_set(): - self._task.cancel() - - try: - await asyncio.wait_for(self._task, timeout=self._timeout) - except asyncio.TimeoutError: - logger.warning('Failed to close MCP session: task timed out') - self._task.cancel() - except asyncio.CancelledError: - pass - except Exception as e: - logger.warning(f'Failed to close MCP session: {e}') - - async def __a_enter__(self) -> ClientSession: - return await self.start() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() - - async def _run(self): - """Run the complete session context within a single task.""" - try: - async with AsyncExitStack() as exit_stack: - transports = await exit_stack.enter_async_context(self._client) - # The streamable http client returns a GetSessionCallback in addition - # to the read/write MemoryObjectStreams needed to build the - # ClientSession. We limit to the first two values to be compatible - # with all clients. - if self._is_stdio: - session = await exit_stack.enter_async_context( - ClientSession( - *transports[:2], - read_timeout_seconds=timedelta(seconds=self._timeout) - if self._timeout is not None - else None, - ) - ) - else: - # For SSE and Streamable HTTP clients, use the sse_read_timeout - # instead of the connection timeout as the read_timeout for the session. - session = await exit_stack.enter_async_context( - ClientSession( - *transports[:2], - read_timeout_seconds=timedelta(seconds=self._sse_read_timeout) - if self._sse_read_timeout is not None - else None, - ) - ) - await asyncio.wait_for(session.initialize(), timeout=self._timeout) - logger.debug('Session has been successfully initialized') - - self._session = session - self._ready_event.set() - - # Wait for close signal - the session remains valid while we wait - await self._close_event.wait() - except BaseException as e: - logger.warning(f'Error on session runner task: {e}') - raise - finally: - self._ready_event.set() - self._close_event.set() +{artifact://9chait9__adk-python/sha/e2ba5cb8957848758855b98245b5922e26c7b8a8} From a9e6311dc51d3191288b36ba415de40383102200 Mon Sep 17 00:00:00 2001 From: 9chait9 <9chait9@gmail.com> Date: Mon, 25 May 2026 21:05:51 -0700 Subject: [PATCH 7/7] =?UTF-8?q?Restore=20session=5Fcontext.py=20=E2=80=94?= =?UTF-8?q?=20revert=20agent's=20corrupt=2078-byte=20artifact-ref=20string?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../adk/tools/mcp_tool/session_context.py | 198 +++++++++++++++++- 1 file changed, 197 insertions(+), 1 deletion(-) diff --git a/src/google/adk/tools/mcp_tool/session_context.py b/src/google/adk/tools/mcp_tool/session_context.py index 13d2a66732..9be634218d 100644 --- a/src/google/adk/tools/mcp_tool/session_context.py +++ b/src/google/adk/tools/mcp_tool/session_context.py @@ -1 +1,197 @@ -{artifact://9chait9__adk-python/sha/e2ba5cb8957848758855b98245b5922e26c7b8a8} +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and: +# limitations under the License. + +from __future__ import annotations + +import asyncio +from contextlib import AsyncExitStack +from datetime import timedelta +import logging +from typing import AsyncContextManager +from typing import Optional + +from mcp import ClientSession + +logger = logging.getLogger('google_adk.' + __name__) + + +class SessionContext: + """Represents the context of a single MCP session within a dedicated task. + + AnyIO's TaskGroup/CancelScope requires that the start and end of a scope + occur within the same task. Since MCP clients use AnyIO internally, we need + to ensure that the client's entire lifecycle (creation, usage, and cleanup) + happens within a single dedicated task. + + This class spawns a background task that: + 1. Enters the MCP client's async context and initializes the session + 2. Signals readiness via an asyncio.Event + 3. Waits for a close signal + 4. Cleans up the client within the same task + + This ensures CancelScope constraints are satisfied regardless of which + task calls start() or close(). + + Can be used in two ways: + 1. Direct method calls: start() and close() + 2. As an async context manager: async with lifecycle as session: ... + """ + + def __init__( + self, + client: AsyncContextManager, + timeout: Optional[float], + sse_read_timeout: Optional[float], + is_stdio: bool = False, + *, + sampling_callback: Optional[object] = None, + sampling_capabilities: Optional[object] = None, + ): + """ + Args: + client: An MCP client context manager (e.g., from streamablehttp_client, + sse_client, or stdio_client). + timeout: Timeout in seconds for connection and initialization. + sse_read_timeout: Timeout in seconds for reading data from the MCP SSE + server. + is_stdio: Whether this is a stdio connection (affects read timeout). + sampling_callback: Optional callback to handle sampling requests from the + MCP server. + sampling_capabilities: Optional capabilities for sampling. + """ + self._client = client + self._timeout = timeout + self._sse_read_timeout = sse_read_timeout + self._is_stdio = is_stdio + self._session: Optional[ClientSession] = None + self._ready_event = asyncio.Event() + self._close_event = asyncio.Event() + self._task: Optional[asyncio.Task] = None + self._task_lock = asyncio.Lock() + + @property + def session(self) -> Optional[ClientSession]: + """Get the managed ClientSession, if available.""" + return self._session + + async def start(self) -> ClientSession: + """Start the runner and wait for the session to be ready. + + Returns: + The initialized ClientSession. + + Raises: + ConnectionError: If session creation fails. + """ + async with self._task_lock: + if self._session: + logger.debug( + 'Session has already been created, returning existing session' + ) + return self._session + + if self._close_event.is_set(): + raise ConnectionError( + 'Failed to create MCP session: session already closed' + ) + + if not self._task: + self._task = asyncio.create_task(self._run()) + + await self._ready_event.wait() + + if self._task.cancelled(): + raise ConnectionError('Failed to create MCP session: task cancelled') + + if self._task.done() and self._task.exception(): + raise ConnectionError( + f'Failed to create MCP session: {self._task.exception()}' + ) from self._task.exception() + + return self._session + + async def close(self): + """Signal the context task to close and wait for cleanup.""" + # Set the close event to signal the task to close. + # Even if start has not been called, we need to set the close event + # to signal the task to close right away. + async with self._task_lock: + self._close_event.set() + + # If start has not been called, only set the close event and return + if not self._task: + return + + if not self._ready_event.is_set(): + self._task.cancel() + + try: + await asyncio.wait_for(self._task, timeout=self._timeout) + except asyncio.TimeoutError: + logger.warning('Failed to close MCP session: task timed out') + self._task.cancel() + except asyncio.CancelledError: + pass + except Exception as e: + logger.warning(f'Failed to close MCP session: {e}') + + async def __a_enter__(self) -> ClientSession: + return await self.start() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def _run(self): + """Run the complete session context within a single task.""" + try: + async with AsyncExitStack() as exit_stack: + transports = await exit_stack.enter_async_context(self._client) + # The streamable http client returns a GetSessionCallback in addition + # to the read/write MemoryObjectStreams needed to build the + # ClientSession. We limit to the first two values to be compatible + # with all clients. + if self._is_stdio: + session = await exit_stack.enter_async_context( + ClientSession( + *transports[:2], + read_timeout_seconds=timedelta(seconds=self._timeout) + if self._timeout is not None + else None, + ) + ) + else: + # For SSE and Streamable HTTP clients, use the sse_read_timeout + # instead of the connection timeout as the read_timeout for the session. + session = await exit_stack.enter_async_context( + ClientSession( + *transports[:2], + read_timeout_seconds=timedelta(seconds=self._sse_read_timeout) + if self._sse_read_timeout is not None + else None, + ) + ) + await asyncio.wait_for(session.initialize(), timeout=self._timeout) + logger.debug('Session has been successfully initialized') + + self._session = session + self._ready_event.set() + + # Wait for close signal - the session remains valid while we wait + await self._close_event.wait() + except BaseException as e: + logger.warning(f'Error on session runner task: {e}') + raise + finally: + self._ready_event.set() + self._close_event.set()