Skip to content

Commit

Permalink
Rate control: add a mechanism to limit the maximum publish on mqtt
Browse files Browse the repository at this point in the history
In some cases like network connection issue, the transport may
queue many publish request to be sent to the broker.
Without any limitation it may affect the broker performances.

In fact if several gateways are in this state and back at the same time,
it may generate a huge amount of traffic at the broker level. Much higher than
the broker capacity.

To limit this problem, a limit in number of publish per seconds can be set with
option --mqtt_rate_limit_pps (or with global variable WM_SERVICES_MQTT_RATE_LIMIT_PPS)

This value must be carefully chosen. If too low it may reduce the capacity
of the gateway. For example if set to 10 but gateway receives in average (under
normal condition) 15 packet from sink(s), gateway will always bufferize.
On the other hand if set to a too high value, it will not protect the broker.

As a rule of thumb, setting it to twice the maximum number of pps expected at
the gateway (full packets, not fragment) is a good tradeof.
With such value, if a gateway is offline for 1h, it will take another 1h
when the connection is back to recover and republish the message received
during the connection loss.

For example if 10pps is expected and limit set to 20, and the connection
is lost for 1h:
- publish for 1h (10 pps are bufferized during 1h)
- connection is back
- publish at 20 pps(limit set): 10 pps from normal traffic + 10pps from buffers
- after 1h, buffers are empty, only 10 pps (normal traffic)
  • Loading branch information
GwendalRaoul committed Aug 2, 2024
1 parent bbcb38c commit aff767a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 46 deletions.
160 changes: 114 additions & 46 deletions python_transport/wirepas_gateway/protocol/mqtt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ssl
from select import select
from threading import Thread, Lock
from time import sleep
from time import sleep, monotonic
from datetime import datetime

from paho.mqtt import client as mqtt
Expand Down Expand Up @@ -74,6 +74,7 @@ def __init__(
"Max inflight messages set to %s", settings.mqtt_max_inflight_messages
)
self._client.max_inflight_messages_set(settings.mqtt_max_inflight_messages)
self._max_inflight_messages = settings.mqtt_max_inflight_messages

self._client.username_pw_set(settings.mqtt_username, settings.mqtt_password)
self._client.on_connect = self._on_connect
Expand Down Expand Up @@ -112,7 +113,9 @@ def __init__(
if not self._use_websockets and self._client.socket() is not None:
self._client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

self._publish_queue = SelectableQueue()
self._publish_queue = SelectableQueue(rate_limit_pps=settings.mqtt_rate_limit_pps)
if settings.mqtt_rate_limit_pps >= 0:
logging.info("Rate control set to %s", settings.mqtt_rate_limit_pps)

# Thread is not started yes
self.running = False
Expand Down Expand Up @@ -160,18 +163,14 @@ def _do_select(self, sock):
self._client.loop_write()

self._client.loop_misc()

# Check if we have something to publish
if self._publish_queue in r:
try:
# Publish everything. Loop is not necessary as
# next select will exit immediately if queue not empty
while True:
while len(self._unpublished_mid_set) < self._max_inflight_messages:
# Publish a single packet from our queue
topic, payload, qos, retain = self._publish_queue.get()

self._publish_from_wrapper_thread(
topic, payload, qos=qos, retain=retain
)
info = self._client.publish(topic, payload, qos=qos, retain=retain)
self._unpublished_mid_set.add(info.mid)

# FIX: read internal sockpairR as it is written but
# never read as we don't use the internal paho loop
Expand Down Expand Up @@ -268,20 +267,6 @@ def run(self):
# thread has exited
self.on_termination_cb()

def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
"""Internal method to publish on Mqtt. This method is only called from
mqtt wrapper thread to avoid races.
Args:
topic: Topic to publish on
payload: Payload
qos: Qos to use
retain: Is it a retain message
"""
mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
self._unpublished_mid_set.add(mid)

def publish(self, topic, payload, qos=1, retain=False) -> None:
""" Method to publish to Mqtt from any thread
Expand Down Expand Up @@ -310,17 +295,26 @@ def publish_waiting_time_s(self):
return self._publish_monitor.get_publish_waiting_time_s()


class SelectableQueue(queue.Queue):
class SelectableQueue(queue.LifoQueue):
"""
Wrapper arround a Queue to make it selectable with an associated
socket
socket and with a built-in rate limit in term of reading
Args:
rate_limit_pps: maximum number of get during one second, None for unlimited
"""

def __init__(self):
def __init__(self, rate_limit_pps=None):
super().__init__()
self._putsocket, self._getsocket = socket.socketpair()
self._lock = Lock()
self._size = 0
if rate_limit_pps == 0:
# 0 is same as no limit
rate_limit_pps = None
self.rate_limit_pps = rate_limit_pps
self._get_ts_list = list()
self._signal_scheduled = False
self._signaled = False
self._signal_lock = Lock()

def fileno(self):
"""
Expand All @@ -330,26 +324,100 @@ def fileno(self):
return self._getsocket.fileno()

def put(self, item, block=True, timeout=None):
with self._lock:
if self._size == 0:
# Send 1 byte on socket to signal select
# Insert item in queue
super().put(item, block, timeout)
self._signal()

def _signal(self, delay_s=0):
with self._signal_lock:
if self._signaled:
return

if self._signal_scheduled:
return

def _signal_with_delay(delay_s):
sleep(delay_s)
with self._signal_lock:
self._signal_scheduled = False
self._putsocket.send(b"x")
self._signaled = True

if delay_s > 0:
self._signal_scheduled = True
Thread(target=_signal_with_delay, args=[delay_s]).start()
else:
# No delay needed, signal directly
self._putsocket.send(b"x")
self._size = self._size + 1

# Insert item in queue
super().put(item, block, timeout)

def get(self, block=False, timeout=None):
with self._lock:
# Get item first so get can be called and
# raise empty exception
item = super().get(block, timeout)
self._signaled = True

def _unsignal(self):
with self._signal_lock:
self._getsocket.recv(1)
self._signaled = False

def _get_current_rate(self):
# First of all, remove the element that are older than 1 second
now = monotonic()
for i in range(len(self._get_ts_list) - 1, -1, -1):
if (self._get_ts_list[i] + 1) < now:
del self._get_ts_list[: i + 1]
break

return len(self._get_ts_list)

def _get_next_time(self):
# Compute when next room will be available
# in moving window
# Return value is between 0 and 1

# Note that _get_current_rate should have been called before
# so that items are all queued for less than 1s
now = monotonic()
if len(self._get_ts_list) > 0:
queued_time = now - self._get_ts_list[0]
if queued_time <= 1:
return 1 - queued_time

return 0

def _is_rate_limit_reached(self):
# Rate limit is computed on last second
if self.rate_limit_pps is None:
# No rate control
return False

if self._get_current_rate() >= self.rate_limit_pps:
# We have reached rate limit
# compute when new room is present
logging.debug("Over the rate limit still {} paquet queued".format(self.qsize()))
# How many time remains for first entry
return True

return False

def get(self):
if self._is_rate_limit_reached():
# We are over the limit so clear select
self._unsignal()
# Start a task to signal available messages
self._signal(delay_s=self._get_next_time())
# There is something to get but rate limit is reached
# so it is empty from consumer point of view
raise queue.Empty

# Get item first so get can be called and
# raise empty exception
try:
item = super().get(False, None)
# If rate limt set, add the get
if self.rate_limit_pps is not None:
self._get_ts_list.append(monotonic())

self._size = self._size - 1
if self._size == 0:
# Consume 1 byte from socket
self._getsocket.recv(1)
return item
except queue.Empty as e:
self._unsignal()
raise e


class PublishMonitor:
Expand Down
12 changes: 12 additions & 0 deletions python_transport/wirepas_gateway/utils/argument_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,18 @@ def add_mqtt(self):
),
)

self.mqtt.add_argument(
"--mqtt_rate_limit_pps",
default=os.environ.get("WM_SERVICES_MQTT_RATE_LIMIT_PPS", 0),
action="store",
type=self.str2int,
help=(
"Max rate limit for the mqtt client to publish on mqtt broker. It can be set to "
"protect the broker from very high usage when one or more gateways are offline for a while "
"and publish all their buffers when connection to broker is restored"
),
)

def add_buffering_settings(self):
""" Parameters used to avoid black hole case """
self.buffering.add_argument(
Expand Down

0 comments on commit aff767a

Please sign in to comment.