Skip to content

Commit 2d36847

Browse files
committed
Fixing Slurm/HPC issues (#2222)
* trying to resolve streaming freeze * preventing Watchdog launch in improper conditions * docformatter fix
1 parent 149bd79 commit 2d36847

File tree

3 files changed

+32
-34
lines changed

3 files changed

+32
-34
lines changed

src/ansys/fluent/core/launcher/launcher.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import os
1010
from pathlib import Path
1111
import platform
12+
import socket
1213
import subprocess
1314
import tempfile
1415
import time
@@ -457,8 +458,21 @@ def _generate_launch_string(
457458
return launch_string
458459

459460

461+
def _confirm_watchdog_start(start_watchdog, cleanup_on_exit, fluent_connection):
462+
"""Confirm whether Fluent is running locally, and whether the Watchdog should be
463+
started."""
464+
if start_watchdog is None and cleanup_on_exit:
465+
host = fluent_connection.connection_properties.cortex_host
466+
if host == socket.gethostname():
467+
logger.debug(
468+
"Fluent running on the host machine and 'cleanup_on_exit' activated, will launch Watchdog."
469+
)
470+
start_watchdog = True
471+
return start_watchdog
472+
473+
460474
def scm_to_py(topy, journal_filepaths):
461-
"""Convert journal filenames to Python filename."""
475+
"""Convert journal file names to Python file name."""
462476
fluent_jou_arg = "".join([f'-i "{journal}" ' for journal in journal_filepaths])
463477
if isinstance(topy, str):
464478
return f" {fluent_jou_arg} -topy={topy}"
@@ -648,13 +662,6 @@ def launch_fluent(
648662
"when starting a remote Fluent PyPIM client."
649663
)
650664

651-
if (
652-
start_watchdog is None
653-
and cleanup_on_exit
654-
and (fluent_launch_mode in (LaunchMode.CONTAINER, LaunchMode.STANDALONE))
655-
):
656-
start_watchdog = True
657-
658665
if dry_run and fluent_launch_mode != LaunchMode.CONTAINER:
659666
logger.warning(
660667
"'dry_run' argument for 'launch_fluent' currently is only "
@@ -757,6 +764,9 @@ def launch_fluent(
757764
launcher_args=argvals,
758765
inside_container=False,
759766
)
767+
start_watchdog = _confirm_watchdog_start(
768+
start_watchdog, cleanup_on_exit, session.fluent_connection
769+
)
760770
if start_watchdog:
761771
logger.info("Launching Watchdog for local Fluent client...")
762772
ip, port, password = _get_server_info(server_info_filepath)
@@ -844,6 +854,8 @@ def launch_fluent(
844854
)
845855
)
846856

857+
if start_watchdog is None and cleanup_on_exit:
858+
start_watchdog = True
847859
if start_watchdog:
848860
logger.debug("Launching Watchdog for Fluent container...")
849861
watchdog.launch(os.getpid(), port, password)
@@ -912,11 +924,12 @@ def connect_to_fluent(
912924
)
913925
new_session = _get_running_session_mode(fluent_connection)
914926

915-
if start_watchdog is None and cleanup_on_exit:
916-
start_watchdog = True
927+
start_watchdog = _confirm_watchdog_start(
928+
start_watchdog, cleanup_on_exit, fluent_connection
929+
)
917930

918931
if start_watchdog:
919-
logger.info("Launching Watchdog for existing Fluent connection...")
932+
logger.info("Launching Watchdog for existing Fluent session...")
920933
ip, port, password = _get_server_info(server_info_filepath, ip, port, password)
921934
watchdog.launch(os.getpid(), port, password, ip)
922935

src/ansys/fluent/core/streaming_services/field_data_streaming.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Module for Field data streaming."""
22

3-
import threading
43
from typing import Callable, Dict, List, Union
54

65
from ansys.api.fluent.v0 import field_data_pb2 as FieldDataProtoModule
@@ -26,7 +25,6 @@ def __init__(self, session_id: str, service):
2625
streaming_service=service,
2726
)
2827
self._session_id: str = session_id
29-
self._lock_refresh: threading.Lock = threading.Lock()
3028

3129
def _process_streaming(self, id, stream_begin_method, started_evt, *args, **kwargs):
3230
"""Processes field data streaming."""

src/ansys/fluent/core/streaming_services/streaming.py

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import itertools
2+
import logging
23
import threading
34
from typing import Callable, Optional
45

6+
logger = logging.getLogger("pyfluent.networking")
7+
58

69
class StreamingService:
710
"""Encapsulates a Fluent streaming service."""
@@ -57,7 +60,7 @@ def unregister_callback(self, callback_id: str):
5760
del self._service_callbacks[callback_id]
5861

5962
def start(self, *args, **kwargs) -> None:
60-
"""Start streaming of Fluent transcript."""
63+
"""Start streaming."""
6164
with self._lock:
6265
if not self.is_streaming:
6366
self._prepare()
@@ -78,30 +81,14 @@ def start(self, *args, **kwargs) -> None:
7881
self._streaming = True
7982

8083
def stop(self) -> None:
81-
"""Stop streaming of Fluent transcript."""
84+
"""Stop streaming."""
8285
if self.is_streaming:
8386
self._streaming_service.end_streaming(self._id, self._stream_begin_method)
84-
self._stream_thread.join()
87+
self._stream_thread.join(timeout=5)
88+
if self._stream_thread.is_alive():
89+
logger.warning(f"Streaming service {self._id} is unresponsive.")
8590
self._streaming = False
8691
self._stream_thread = None
8792

88-
def refresh(self, session_id, event_info) -> None:
89-
"""Refresh stream.
90-
91-
Parameters
92-
----------
93-
session_id : str
94-
Name of the monitor set.
95-
event_info : object
96-
Event info object.
97-
98-
Returns
99-
-------
100-
None
101-
"""
102-
with self._lock_refresh:
103-
self.stop()
104-
self.start()
105-
10693
def _prepare(self):
10794
pass # Currently only used by monitor services.

0 commit comments

Comments
 (0)