From f3df50786a091f9d53a76ad1f50423095b5c06d3 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Wed, 23 Oct 2024 16:00:19 +0200 Subject: [PATCH 1/2] fix (StompMQConnector): add a timeout for the StompConnector --- src/DIRAC/Resources/MessageQueue/StompMQConnector.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py index 228dbbad14d..700e7d1feec 100644 --- a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py +++ b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py @@ -35,6 +35,7 @@ class StompMQConnector(MQConnector): RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure. RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time. RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect. + STOMP_TIMEOUT = 60 PORT = 61613 @@ -72,6 +73,8 @@ def setupConnection(self, parameters=None): reconnectSleepJitter = self.parameters.get("ReconnectSleepJitter", StompMQConnector.RECONNECT_SLEEP_JITTER) reconnectAttemptsMax = self.parameters.get("ReconnectAttemptsMax", StompMQConnector.RECONNECT_ATTEMPTS_MAX) + stompTimeout = self.parameters.get("Timeout", StompMQConnector.STOMP_TIMEOUT) + host = self.parameters.get("Host") port = self.parameters.get("Port", StompMQConnector.PORT) vhost = self.parameters.get("VHost") @@ -83,6 +86,7 @@ def setupConnection(self, parameters=None): connectionArgs = { "vhost": vhost, "keepalive": True, + "timeout": stompTimeout, "reconnect_sleep_initial": reconnectSleepInitial, "reconnect_sleep_increase": reconnectSleepIncrease, "reconnect_sleep_max": reconnectSleepMax, From 612065e8d32f9296e3c5b21546d9affae0b0ebfa Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Thu, 24 Oct 2024 10:00:00 +0200 Subject: [PATCH 2/2] fix (StompMQConnector): add heartbeats parameter to StompConnector --- src/DIRAC/Resources/MessageQueue/StompMQConnector.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py index 700e7d1feec..f150925b7a2 100644 --- a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py +++ b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py @@ -35,6 +35,9 @@ class StompMQConnector(MQConnector): RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure. RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time. RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect. + + OUTGOING_HEARTBEAT_MS = 15_000 + INCOMING_HEARTBEAT_MS = 15_000 STOMP_TIMEOUT = 60 PORT = 61613 @@ -73,6 +76,9 @@ def setupConnection(self, parameters=None): reconnectSleepJitter = self.parameters.get("ReconnectSleepJitter", StompMQConnector.RECONNECT_SLEEP_JITTER) reconnectAttemptsMax = self.parameters.get("ReconnectAttemptsMax", StompMQConnector.RECONNECT_ATTEMPTS_MAX) + outgoingHeartbeatMs = self.parameters.get("OutgoingHeartbeatMs", StompMQConnector.OUTGOING_HEARTBEAT_MS) + incomingHeartbeatMs = self.parameters.get("IncomingHeartbeatMs", StompMQConnector.INCOMING_HEARTBEAT_MS) + stompTimeout = self.parameters.get("Timeout", StompMQConnector.STOMP_TIMEOUT) host = self.parameters.get("Host") @@ -87,6 +93,7 @@ def setupConnection(self, parameters=None): "vhost": vhost, "keepalive": True, "timeout": stompTimeout, + "heartbeats": (outgoingHeartbeatMs, incomingHeartbeatMs), "reconnect_sleep_initial": reconnectSleepInitial, "reconnect_sleep_increase": reconnectSleepIncrease, "reconnect_sleep_max": reconnectSleepMax,