diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index b2b846ee46..5a3daa0a28 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -20,6 +20,7 @@ enable_proctitle_on_children, enable_proctitle_on_current, ) +from distributed.scheduler import DEFAULT_SCHEDULER_PORT logger = logging.getLogger("distributed.scheduler") @@ -165,9 +166,9 @@ def main( if port is None and (not host or not re.search(r":\d", host)): if isinstance(protocol, list): - port = [8786] + [0] * (len(protocol) - 1) + port = [DEFAULT_SCHEDULER_PORT] + [0] * (len(protocol) - 1) else: - port = 8786 + port = DEFAULT_SCHEDULER_PORT if isinstance(protocol, list) or isinstance(port, list): if (not isinstance(protocol, list) or not isinstance(port, list)) or len( diff --git a/distributed/cli/dask_ssh.py b/distributed/cli/dask_ssh.py index f449774537..353b43d0a5 100755 --- a/distributed/cli/dask_ssh.py +++ b/distributed/cli/dask_ssh.py @@ -8,6 +8,7 @@ import click from distributed.deploy.old_ssh import SSHCluster +from distributed.scheduler import DEFAULT_SCHEDULER_PORT logger = logging.getLogger("distributed.dask_ssh") @@ -30,7 +31,7 @@ ) @click.option( "--scheduler-port", - default=8786, + default=DEFAULT_SCHEDULER_PORT, show_default=True, type=int, help="Specify scheduler port number.", diff --git a/distributed/core.py b/distributed/core.py index a4bb031c12..f17d3f55a7 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -717,13 +717,32 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs): else: addr = port_or_addr assert isinstance(addr, str) - listener = await listen( - addr, - self.handle_comm, - deserialize=self.deserialize, - allow_offload=allow_offload, - **kwargs, - ) + try: + listener = await listen( + addr, + self.handle_comm, + deserialize=self.deserialize, + allow_offload=allow_offload, + **kwargs, + ) + except OSError: + fallback_port_or_addr = kwargs.get("fallback_port_or_addr", None) + if not fallback_port_or_addr: + raise + warnings.warn( + f"Address {addr} is already in use.\n" + f"Falling back to {fallback_port_or_addr} instead", + UserWarning, + stacklevel=2, + ) + listener = await listen( + fallback_port_or_addr, + self.handle_comm, + deserialize=self.deserialize, + allow_offload=allow_offload, + **kwargs, + ) + self.listeners.append(listener) def handle_comm(self, comm: Comm) -> NoOpAwaitable: diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index b7f736e1ed..aa5378e6e0 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -123,7 +123,7 @@ def __init__( start=None, host=None, ip=None, - scheduler_port=0, + scheduler_port=None, silence_logs=logging.WARN, dashboard_address=":8787", worker_dashboard_address=None, diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d1f18d7c2f..21f34446fe 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -194,6 +194,8 @@ "stealing": WorkStealing, } +DEFAULT_SCHEDULER_PORT = 8786 + class ClientState: """A simple object holding information about a client.""" @@ -3674,7 +3676,6 @@ class Scheduler(SchedulerState, ServerNode): Time we expect certain functions to take, e.g. ``{'sum': 0.25}`` """ - default_port = 8786 _instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet() worker_ttl: float | None @@ -3785,8 +3786,18 @@ def __init__( interface=interface, protocol=protocol, security=security, - default_port=self.default_port, + default_port=DEFAULT_SCHEDULER_PORT, ) + if port is None: + self._fallback_start_addresses = addresses_from_user_args( + host=host, + port=0, + interface=interface, + protocol=protocol, + security=security, + ) + else: + self._fallback_start_addresses = [] http_server_modules = dask.config.get("distributed.scheduler.http.routes") show_dashboard = dashboard or (dashboard is None and dashboard_address) @@ -4199,11 +4210,14 @@ async def start_unsafe(self) -> Self: self._clear_task_state() - for addr in self._start_address: + for addr, fallback_addr in itertools.zip_longest( + self._start_address, self._fallback_start_addresses + ): await self.listen( addr, allow_offload=False, handshake_overrides={"pickle-protocol": 4, "compression": None}, + fallback_port_or_addr=fallback_addr, **self.security.get_listen_args("scheduler"), ) self.ip = get_address_host(self.listen_address) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 334f1eced4..9dc7011cc0 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -59,6 +59,7 @@ from distributed.node import ServerNode from distributed.proctitle import enable_proctitle_on_children from distributed.protocol import deserialize +from distributed.scheduler import DEFAULT_SCHEDULER_PORT from distributed.scheduler import TaskState as SchedulerTaskState from distributed.security import Security from distributed.utils import ( @@ -2480,7 +2481,7 @@ def _bind_port(port): s.listen(1) yield s - default_ports = [8786] + default_ports = [DEFAULT_SCHEDULER_PORT] while time() - start < _TEST_TIMEOUT: try: