From 865f080e75ba772f617e45659b1998137654b384 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 2 May 2024 09:38:31 +0200 Subject: [PATCH 1/5] Dont use topickle when sending graph --- distributed/client.py | 7 +++---- distributed/scheduler.py | 11 +++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index df976d0e058..01878ec0cf1 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3150,9 +3150,9 @@ def _graph_to_futures( futures = {key: Future(key, self, inform=False) for key in keyset} # Circular import from distributed.protocol import serialize - from distributed.protocol.serialize import ToPickle + from distributed.protocol.serialize import Serialize, Serialized, ToPickle - header, frames = serialize(ToPickle(dsk), on_error="raise") + header, frames = serialize(Serialize(dsk), on_error="raise") pickled_size = sum(map(nbytes, [header] + frames)) if pickled_size > parse_bytes( @@ -3170,8 +3170,7 @@ def _graph_to_futures( self._send_to_scheduler( { "op": "update-graph", - "graph_header": header, - "graph_frames": frames, + "graph": Serialized(header, frames), "keys": list(keys), "internal_priority": internal_priority, "submitting_task": getattr(thread_state, "key", None), diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d25793c08f4..1bbff6682dc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -105,7 +105,7 @@ from distributed.proctitle import setproctitle from distributed.protocol import deserialize from distributed.protocol.pickle import dumps, loads -from distributed.protocol.serialize import Serialized, ToPickle, serialize +from distributed.protocol.serialize import Serialize, Serialized, serialize from distributed.publish import PublishExtension from distributed.pubsub import PubSubSchedulerExtension from distributed.queues import QueueExtension @@ -3439,7 +3439,7 @@ def _task_to_msg(self, ts: TaskState, duration: float = -1) -> dict[str, Any]: for dts in ts.dependencies }, "nbytes": {dts.key: dts.nbytes for dts in ts.dependencies}, - "run_spec": ToPickle(ts.run_spec), + "run_spec": Serialize(ts.run_spec), "resource_restrictions": ts.resource_restrictions, "actor": ts.actor, "annotations": ts.annotations or {}, @@ -4664,8 +4664,7 @@ def _create_taskstate_from_graph( async def update_graph( self, client: str, - graph_header: dict, - graph_frames: list[bytes], + graph_ser: Serialized, keys: set[Key], internal_priority: dict[Key, int] | None, submitting_task: Key | None, @@ -4679,8 +4678,8 @@ async def update_graph( start = time() try: try: - graph = deserialize(graph_header, graph_frames).data - del graph_header, graph_frames + graph = deserialize(graph_ser.header, graph_ser.frames) + del graph_ser except Exception as e: msg = """\ Error during deserialization of the task graph. This frequently From a75c993bb632126f9b488dc4be0ef5819a686abc Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 2 May 2024 13:20:34 +0200 Subject: [PATCH 2/5] fix kwarg name --- distributed/scheduler.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1bbff6682dc..0eba28cb9f7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4664,7 +4664,7 @@ def _create_taskstate_from_graph( async def update_graph( self, client: str, - graph_ser: Serialized, + graph: Serialized, keys: set[Key], internal_priority: dict[Key, int] | None, submitting_task: Key | None, @@ -4678,8 +4678,7 @@ async def update_graph( start = time() try: try: - graph = deserialize(graph_ser.header, graph_ser.frames) - del graph_ser + graph = deserialize(graph.header, graph.frames) except Exception as e: msg = """\ Error during deserialization of the task graph. This frequently @@ -4694,7 +4693,7 @@ async def update_graph( annotations_by_type, ) = await offload( _materialize_graph, - graph=graph, + graph=graph, # type: ignore[arg-type] global_annotations=annotations or {}, ) del graph From 2d5158a28e4e8f43243412d562407797d466f83d Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 2 May 2024 13:54:16 +0200 Subject: [PATCH 3/5] fix update graph test --- distributed/tests/test_scheduler.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 68ccfbfccbd..6be8c4be0a5 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -44,9 +44,8 @@ from distributed.compatibility import LINUX, MACOS, WINDOWS, PeriodicCallback from distributed.core import ConnectionPool, Status, clean_exception, connect, rpc from distributed.metrics import time -from distributed.protocol import serialize +from distributed.protocol import Serialize, Serialized, serialize from distributed.protocol.pickle import dumps, loads -from distributed.protocol.serialize import ToPickle from distributed.scheduler import KilledWorker, MemoryState, Scheduler, WorkerState from distributed.utils import TimeoutError, wait_for from distributed.utils_test import ( @@ -1431,10 +1430,8 @@ async def test_update_graph_culls(s, a, b): dependencies={"foo": set()}, ) - header, frames = serialize(ToPickle(dsk), on_error="raise") await s.update_graph( - graph_header=header, - graph_frames=frames, + graph=Serialized(serialize(Serialize(dsk), on_error="raise")), keys=["y"], client="client", internal_priority={k: 0 for k in "xyz"}, From b3ef5ce39b58d1672c7435d74738a5fba701a65b Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 3 May 2024 11:21:28 +0200 Subject: [PATCH 4/5] fix --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 6be8c4be0a5..4d0a9b49e98 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1431,7 +1431,7 @@ async def test_update_graph_culls(s, a, b): ) await s.update_graph( - graph=Serialized(serialize(Serialize(dsk), on_error="raise")), + graph=Serialized(*serialize(Serialize(dsk), on_error="raise")), keys=["y"], client="client", internal_priority={k: 0 for k in "xyz"}, From a990563c4a65f407fa45bf7dab68dcaff4ab25b9 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 3 May 2024 17:43:37 +0200 Subject: [PATCH 5/5] use ToPickle --- distributed/client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 01878ec0cf1..1e2fcd96699 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3150,9 +3150,10 @@ def _graph_to_futures( futures = {key: Future(key, self, inform=False) for key in keyset} # Circular import from distributed.protocol import serialize - from distributed.protocol.serialize import Serialize, Serialized, ToPickle + from distributed.protocol.serialize import Pickled, ToPickle - header, frames = serialize(Serialize(dsk), on_error="raise") + # This is pulled out to have better exception messages + header, frames = serialize(ToPickle(dsk), on_error="raise") pickled_size = sum(map(nbytes, [header] + frames)) if pickled_size > parse_bytes( @@ -3170,7 +3171,7 @@ def _graph_to_futures( self._send_to_scheduler( { "op": "update-graph", - "graph": Serialized(header, frames), + "graph": Pickled(header, frames), "keys": list(keys), "internal_priority": internal_priority, "submitting_task": getattr(thread_state, "key", None),