From 4609e8b0aefb65fa5b1ca4d425257dd91e9bae43 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 5 Jun 2025 13:21:51 -0400 Subject: [PATCH 1/5] PYTHON-5406 - AsyncPeriodicExecutor must reset CSOT contextvars before executing --- pymongo/_asyncio_task.py | 1 + pymongo/_csot.py | 6 ++++++ pymongo/periodic_executor.py | 3 +++ 3 files changed, 10 insertions(+) diff --git a/pymongo/_asyncio_task.py b/pymongo/_asyncio_task.py index 7a528f027d..4c35b3d724 100644 --- a/pymongo/_asyncio_task.py +++ b/pymongo/_asyncio_task.py @@ -43,6 +43,7 @@ def cancelling(self) -> int: return self._cancel_requests +# We can directly pass an empty Context() object to create_task in Python >= 3.11 def create_task(coro: Coroutine[Any, Any, Any], *, name: Optional[str] = None) -> asyncio.Task: if sys.version_info >= (3, 11): return asyncio.create_task(coro, name=name) diff --git a/pymongo/_csot.py b/pymongo/_csot.py index 06c6b68ac9..c5681e345a 100644 --- a/pymongo/_csot.py +++ b/pymongo/_csot.py @@ -32,6 +32,12 @@ DEADLINE: ContextVar[float] = ContextVar("DEADLINE", default=float("inf")) +def reset_all() -> None: + TIMEOUT.set(None) + RTT.set(0.0) + DEADLINE.set(float("inf")) + + def get_timeout() -> Optional[float]: return TIMEOUT.get(None) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index 323debdce2..ed369a2b21 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -23,6 +23,7 @@ import weakref from typing import Any, Optional +from pymongo import _csot from pymongo._asyncio_task import create_task from pymongo.lock import _create_lock @@ -93,6 +94,8 @@ def skip_sleep(self) -> None: self._skip_sleep = True async def _run(self) -> None: + # The CSOT contextvars must be cleared inside the executor task before execution begins + _csot.reset_all() while not self._stopped: if self._task and self._task.cancelling(): # type: ignore[unused-ignore, attr-defined] raise asyncio.CancelledError From 28003128be463ad665911f77e91525e44adb156c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 5 Jun 2025 15:42:19 -0400 Subject: [PATCH 2/5] Add test --- .../test_async_contextvars_reset.py | 40 +++++++++++++++++++ tools/synchro.py | 1 + 2 files changed, 41 insertions(+) create mode 100644 test/asynchronous/test_async_contextvars_reset.py diff --git a/test/asynchronous/test_async_contextvars_reset.py b/test/asynchronous/test_async_contextvars_reset.py new file mode 100644 index 0000000000..6f46bec06b --- /dev/null +++ b/test/asynchronous/test_async_contextvars_reset.py @@ -0,0 +1,40 @@ +# Copyright 2025-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test that AsyncPeriodicExecutors do not copy ContextVars from their parents.""" +from __future__ import annotations + +import asyncio +import sys +from test.asynchronous.utils import async_get_pool +from test.utils_shared import delay, one + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest + + +class TestAsyncContextVarsReset(AsyncIntegrationTest): + async def test_context_vars_are_reset_in_executor(self): + client = self.simple_client() + + await client.db.test.insert_one({"x": 1}) + for server in client._topology._servers.values(): + for context in [ + c + for c in server._monitor._executor._task.get_context() + if c.name in ["TIMEOUT", "RTT", "DEADLINE"] + ]: + self.assertIn(context.get(), [None, float("inf"), 0.0]) + await client.db.test.delete_many({}) diff --git a/tools/synchro.py b/tools/synchro.py index 1fa8c674a5..906bfd00da 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -185,6 +185,7 @@ def async_only_test(f: str) -> bool: "test_concurrency.py", "test_async_cancellation.py", "test_async_loop_safety.py", + "test_async_contextvars_reset.py", ] From de3cbed72b044991ef23f7b91d61eaf2386dd66c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 5 Jun 2025 16:00:22 -0400 Subject: [PATCH 3/5] Fix test versioning --- test/asynchronous/test_async_contextvars_reset.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/asynchronous/test_async_contextvars_reset.py b/test/asynchronous/test_async_contextvars_reset.py index 6f46bec06b..d898fa35a8 100644 --- a/test/asynchronous/test_async_contextvars_reset.py +++ b/test/asynchronous/test_async_contextvars_reset.py @@ -27,6 +27,8 @@ class TestAsyncContextVarsReset(AsyncIntegrationTest): async def test_context_vars_are_reset_in_executor(self): + if sys.version_info < (3, 11): + self.skipTest("Test requires asyncio.Task.get_context (added in Python 3.11)") client = self.simple_client() await client.db.test.insert_one({"x": 1}) From 0aa17085044be5278da43157f809053a3a542eed Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 5 Jun 2025 16:01:00 -0400 Subject: [PATCH 4/5] Fix test versioning --- test/asynchronous/test_async_contextvars_reset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/asynchronous/test_async_contextvars_reset.py b/test/asynchronous/test_async_contextvars_reset.py index d898fa35a8..9b0e2dc4dc 100644 --- a/test/asynchronous/test_async_contextvars_reset.py +++ b/test/asynchronous/test_async_contextvars_reset.py @@ -29,6 +29,7 @@ class TestAsyncContextVarsReset(AsyncIntegrationTest): async def test_context_vars_are_reset_in_executor(self): if sys.version_info < (3, 11): self.skipTest("Test requires asyncio.Task.get_context (added in Python 3.11)") + client = self.simple_client() await client.db.test.insert_one({"x": 1}) From da9ecaf9753afc001b1e0d9f3f72cbc299b51de8 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 6 Jun 2025 07:38:37 -0400 Subject: [PATCH 5/5] Remove comment --- pymongo/_asyncio_task.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pymongo/_asyncio_task.py b/pymongo/_asyncio_task.py index 4c35b3d724..7a528f027d 100644 --- a/pymongo/_asyncio_task.py +++ b/pymongo/_asyncio_task.py @@ -43,7 +43,6 @@ def cancelling(self) -> int: return self._cancel_requests -# We can directly pass an empty Context() object to create_task in Python >= 3.11 def create_task(coro: Coroutine[Any, Any, Any], *, name: Optional[str] = None) -> asyncio.Task: if sys.version_info >= (3, 11): return asyncio.create_task(coro, name=name)