Skip to content

Changed library to use background loop instead of manually looping #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ python:
- '3.7'
before_install:
- docker pull eclipse-mosquitto
- docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto:/mosquitto/config eclipse-mosquitto
- docker run -d -p 1883:1883 eclipse-mosquitto
- docker run -d -p 1883:1883 -v $(pwd)/mosquitto/mosquitto-no-passwd.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
- docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf -v $(pwd)/mosquitto/passwd_file:/mosquitto/config/passwd_file eclipse-mosquitto
install:
- pip install -r requirements.txt
script:
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ The keywords in this library are based on some of the methods available in eclip
The tests are in ``tests`` folder and make use of Robot Framework itself. They are run automatically through travis when code is pushed to a branch. When run locally, these tests rely on locally running mqtt brokers. We need 2 running brokers, one without auth that is used by most of the tests, and the other one with auth (configuration file is provided). You'll need to start them before running the tests. You can then run the tests locally::

docker pull eclipse-mosquitto
docker run -d -p 1883:1883 eclipse-mosquitto
docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto:/mosquitto/config eclipse-mosquitto
docker run -d -p 1883:1883 -v $(pwd)/mosquitto/mosquitto-no-passwd.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf -v $(pwd)/mosquitto/passwd_file:/mosquitto/config/passwd_file eclipse-mosquitto
robot -P src tests


Expand Down
2 changes: 2 additions & 0 deletions mosquitto/mosquitto-no-passwd.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883
allow_anonymous true
3 changes: 2 additions & 1 deletion mosquitto/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
allow_anonymous false
password_file ./mosquitto/config/passwd_file
password_file /mosquitto/config/passwd_file
listener 1883
158 changes: 86 additions & 72 deletions src/MQTTLibrary/MQTTKeywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ def connect(self, broker, port=1883, client_id="", clean_session=True):
self._mqttc.connect(broker, int(port))

timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._connected or self._unexpected_disconnect:
break;
self._mqttc.loop()

if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")
logger.debug('client_id: %s' % self._mqttc._client_id)
try:
self._mqttc.loop_start()
while time.time() < timer_start + self._loop_timeout:
time.sleep(1e-3)
if self._connected or self._unexpected_disconnect:
break
if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")
logger.debug('client_id: %s' % self._mqttc._client_id)
finally:
self._mqttc.loop_stop()
return self._mqttc

def publish(self, topic, message=None, qos=0, retain=False):
Expand All @@ -108,22 +111,26 @@ def publish(self, topic, message=None, qos=0, retain=False):
| Publish | test/test | test message | 1 | ${false} |

"""
logger.info('Publish topic: %s, message: %s, qos: %s, retain: %s'
% (topic, message, qos, retain))
self._mid = -1
self._mqttc.on_publish = self._on_publish
result, mid = self._mqttc.publish(topic, message, int(qos), retain)
if result != 0:
raise RuntimeError('Error publishing: %s' % result)

timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if mid == self._mid:
break;
self._mqttc.loop()

if mid != self._mid:
logger.warn('mid wasn\'t matched: %s' % mid)
try:
self._mqttc.loop_start()
logger.info('Publish topic: %s, message: %s, qos: %s, retain: %s'
% (topic, message, qos, retain))
self._mid = -1
self._mqttc.on_publish = self._on_publish

result, mid = self._mqttc.publish(topic, message, int(qos), retain)
if result != 0:
raise RuntimeError('Error publishing: %s' % result)

timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if mid == self._mid:
break

if mid != self._mid:
logger.warn('mid wasn\'t matched: %s' % mid)
finally:
self._mqttc.loop_stop()

def subscribe(self, topic, qos, timeout=1, limit=1):
""" Subscribe to a topic and return a list of message payloads received
Expand Down Expand Up @@ -166,17 +173,16 @@ def subscribe(self, topic, qos, timeout=1, limit=1):
return self._messages[topic]

timer_start = time.time()
while time.time() < timer_start + seconds:
if limit == 0 or len(self._messages[topic]) < limit:
self._mqttc.loop()
else:
# workaround for client to ack the publish. Otherwise,
# it seems that if client disconnects quickly, broker
# will not get the ack and publish the message again on
# next connect.
time.sleep(1)
break

self._mqttc.loop_start()
try:
while time.time() < timer_start + seconds:
if limit == 0 or len(self._messages[topic]) < limit:
time.sleep(1e-3)
finally:
self._mqttc.loop_stop()
return self._messages[topic]


def listen(self, topic, timeout=1, limit=1):
""" Listen to a topic and return a list of message payloads received
Expand All @@ -202,7 +208,7 @@ def listen(self, topic, timeout=1, limit=1):
timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._subscribed:
break;
break
time.sleep(1)
if not self._subscribed:
logger.warn('Cannot listen when not subscribed to a topic')
Expand All @@ -223,22 +229,15 @@ def listen(self, topic, timeout=1, limit=1):

logger.info('Listening on topic: %s' % topic)
timer_start = time.time()
while time.time() < timer_start + seconds:
if limit == 0 or len(self._messages[topic]) < limit:
# If the loop is running in the background
# merely sleep here for a second or so and continue
# otherwise, do the loop ourselves
if self._background_mqttc:
time.sleep(1)
self._mqttc.loop_start()
try:
while time.time() < timer_start + seconds:
if limit == 0 or len(self._messages[topic]) < limit:
time.sleep(1e-3)
else:
self._mqttc.loop()
else:
# workaround for client to ack the publish. Otherwise,
# it seems that if client disconnects quickly, broker
# will not get the ack and publish the message again on
# next connect.
time.sleep(1)
break
break
finally:
self._mqttc.loop_stop()

messages = self._messages[topic][:] # Copy the list's contents
self._messages[topic] = []
Expand Down Expand Up @@ -273,13 +272,17 @@ def subscribe_and_validate(self, topic, qos, payload, timeout=1):
self._mqttc.subscribe(str(topic), int(qos))

timer_start = time.time()
while time.time() < timer_start + seconds:
if self._verified:
break
self._mqttc.loop()
self._mqttc.loop_start()
try:
while time.time() < timer_start + seconds:
if self._verified:
break
time.sleep(1e-3)

if not self._verified:
raise AssertionError("The expected payload didn't arrive in the topic")
if not self._verified:
raise AssertionError("The expected payload didn't arrive in the topic")
finally:
self._mqttc.loop_stop()

def unsubscribe(self, topic):
""" Unsubscribe the client from the specified topic.
Expand Down Expand Up @@ -309,13 +312,17 @@ def unsubscribe(self, topic):
self._mqttc.on_unsubscribe = self._on_unsubscribe
self._mqttc.unsubscribe(str(topic))

self._mqttc.loop_start()
timer_start = time.time()
while (not self._unsubscribed and
time.time() < timer_start + self._loop_timeout):
self._mqttc.loop()
try:
while (not self._unsubscribed and
time.time() < timer_start + self._loop_timeout):
time.sleep(1e-3)

if not self._unsubscribed:
logger.warn('Client didn\'t receive an unsubscribe callback')
if not self._unsubscribed:
logger.warn('Client didn\'t receive an unsubscribe callback')
finally:
self._mqttc.loop_stop()

def disconnect(self):
""" Disconnect from MQTT Broker.
Expand All @@ -336,12 +343,18 @@ def disconnect(self):
self._mqttc.disconnect()

timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._disconnected or self._unexpected_disconnect:
break;
self._mqttc.loop()
if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")
self._mqttc.loop_start()
try:
while time.time() < timer_start + self._loop_timeout:
if self._disconnected or self._unexpected_disconnect:
break

if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")

finally:
self._mqttc.loop_stop()


def publish_single(self, topic, payload=None, qos=0, retain=False,
hostname="localhost", port=1883, client_id="", keepalive=60,
Expand Down Expand Up @@ -446,11 +459,12 @@ def _on_connect(self, client, userdata, flags, rc):
self._connected = True if rc == 0 else False

def _on_disconnect(self, client, userdata, rc):
if rc == 0:
self._disconnected = True
self._unexpected_disconnect = False
else:
self._unexpected_disconnect = True
pass
# if rc == 0:
# self._disconnected = True
# self._unexpected_disconnect = False
# else:
# self._unexpected_disconnect = True

def _on_subscribe(self, client, userdata, mid, granted_qos):
self._subscribed = True
Expand Down