Skip to content

Commit f5426b1

Browse files
author
Andrew Utkin
committed
Fix deadlock in GlobalKaleidoServer under concurrent access
The shared `_return_queue` causes a race condition when multiple threads call `to_image` concurrently: `task_queue.join()` unblocks all waiters when the count hits zero, not when a specific task completes. Two threads race to `return_queue.get()`, one wins, the other blocks forever. Fix: give each caller its own `Queue` embedded in the `Task`. The server routes the result directly into it. No sharing, no race, no deadlock. Additionally, `_sync` functions now auto-start the singleton server instead of falling back to `oneshot_async_run`, which creates a new thread and event loop per call and can also hang under load. The `atexit` handler that called `close()` → `thread.join()` is removed since the server thread is already `daemon=True` and will exit with the process. The `join()` could hang if Chromium shutdown blocks. Reproduces ~1/1000 calls with 64 threads. See https://github.com/plotly/plotly.py/issues/5549
1 parent b021c3d commit f5426b1

2 files changed

Lines changed: 17 additions & 29 deletions

File tree

src/py/kaleido/__init__.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -162,29 +162,24 @@ async def write_fig_from_object(
162162
)
163163

164164

165+
def _ensure_server() -> None:
166+
if not _global_server.is_running():
167+
_global_server.open(silence_warnings=True)
168+
169+
165170
def calc_fig_sync(*args: Any, **kwargs: Any):
166171
"""Call `calc_fig` but blocking."""
167-
if _global_server.is_running():
168-
return _global_server.call_function("calc_fig", *args, **kwargs)
169-
else:
170-
return _sync_server.oneshot_async_run(calc_fig, args=args, kwargs=kwargs)
172+
_ensure_server()
173+
return _global_server.call_function("calc_fig", *args, **kwargs)
171174

172175

173176
def write_fig_sync(*args: Any, **kwargs: Any):
174177
"""Call `write_fig` but blocking."""
175-
if _global_server.is_running():
176-
return _global_server.call_function("write_fig", *args, **kwargs)
177-
else:
178-
return _sync_server.oneshot_async_run(write_fig, args=args, kwargs=kwargs)
178+
_ensure_server()
179+
_global_server.call_function("write_fig", *args, **kwargs)
179180

180181

181182
def write_fig_from_object_sync(*args: Any, **kwargs: Any):
182183
"""Call `write_fig_from_object` but blocking."""
183-
if _global_server.is_running():
184-
return _global_server.call_function("write_fig_from_object", *args, **kwargs)
185-
else:
186-
return _sync_server.oneshot_async_run(
187-
write_fig_from_object,
188-
args=args,
189-
kwargs=kwargs,
190-
)
184+
_ensure_server()
185+
_global_server.call_function("write_fig_from_object", *args, **kwargs)

src/py/kaleido/_sync_server.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4-
import atexit
54
import warnings
6-
from functools import partial
75
from queue import Queue
86
from threading import Thread
97
from typing import TYPE_CHECKING, NamedTuple
@@ -18,6 +16,7 @@ class Task(NamedTuple):
1816
fn: str
1917
args: Any
2018
kwargs: Any
19+
result_queue: Queue # per-caller mailbox
2120

2221

2322
class _BadFunctionName(BaseException):
@@ -37,13 +36,11 @@ async def _server(self, *args, **kwargs):
3736
if not hasattr(k, task.fn):
3837
raise _BadFunctionName(f"Kaleido has no attribute {task.fn}")
3938
try:
40-
self._return_queue.put(
39+
task.result_queue.put(
4140
await getattr(k, task.fn)(*task.args, **task.kwargs),
4241
)
4342
except Exception as e: # noqa: BLE001
44-
self._return_queue.put(e)
45-
46-
self._task_queue.task_done()
43+
task.result_queue.put(e)
4744

4845
def __new__(cls):
4946
# Create the singleton on first instantiation
@@ -72,11 +69,8 @@ def open(self, *args: Any, silence_warnings=False, **kwargs: Any) -> None:
7269
daemon=True,
7370
)
7471
self._task_queue: Queue[Task | None] = Queue()
75-
self._return_queue: Queue[Any] = Queue()
7672
self._thread.start()
7773
self._initialized = True
78-
close = partial(self.close, silence_warnings=True)
79-
atexit.register(close)
8074

8175
def close(self, *, silence_warnings=False):
8276
"""Reset the singleton back to an uninitialized state."""
@@ -92,7 +86,6 @@ def close(self, *, silence_warnings=False):
9286
self._thread.join()
9387
del self._thread
9488
del self._task_queue
95-
del self._return_queue
9689
self._initialized = False
9790

9891
def call_function(self, cmd: str, *args: Any, **kwargs: Any):
@@ -117,9 +110,9 @@ def call_function(self, cmd: str, *args: Any, **kwargs: Any):
117110
UserWarning,
118111
stacklevel=3,
119112
)
120-
self._task_queue.put(Task(cmd, args, kwargs))
121-
self._task_queue.join()
122-
res = self._return_queue.get()
113+
my_queue: Queue[Any] = Queue(maxsize=1)
114+
self._task_queue.put(Task(cmd, args, kwargs, my_queue))
115+
res = my_queue.get()
123116
if isinstance(res, BaseException):
124117
raise res
125118
else:

0 commit comments

Comments
 (0)