diff --git a/README.md b/README.md index ad4920d..dc30b0a 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,11 @@ An effective PC client and server is [mosquitto](https://mosquitto.org/). # This repository -This contains two separate projects: +This contains three separate projects: 1. A "resilient" asynchronous non-blocking MQTT driver. 2. A means of using a cheap ESP8266 module to bring MQTT to MicroPython platforms which lack a WiFi interface. + 3. A basic network hardware controller (WLAN) which the mqtt client uses ## 1. The "resilient" driver diff --git a/mqtt_as/__init__.py b/mqtt_as/__init__.py new file mode 100644 index 0000000..54f6128 --- /dev/null +++ b/mqtt_as/__init__.py @@ -0,0 +1 @@ +from .mqtt_as import MQTTClient, config # allow backwards compatible import statements diff --git a/mqtt_as/interfaces/__init__.py b/mqtt_as/interfaces/__init__.py new file mode 100644 index 0000000..83ee17a --- /dev/null +++ b/mqtt_as/interfaces/__init__.py @@ -0,0 +1,92 @@ +from uerrno import EINPROGRESS, ETIMEDOUT +import usocket +import uasyncio as asyncio + + +async def _g(): + pass + + +_type_coro = type(_g()) + + +# If a callback is passed, run it and return. +# If a coro is passed initiate it and return. +# coros are passed by name i.e. not using function call syntax. +def launch(func, tup_args): + res = func(*tup_args) + if isinstance(res, _type_coro): + res = asyncio.create_task(res) + return res + + +class BaseInterface: + def __init__(self, socket=None): + # Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection(). + self.BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] + self.socket = socket or usocket # support for custom socket implementations + self._subs = [] + self._state = None + + async def connect(self): + """Serve connect request. Triggers callbacks if state changes""" + if await self._connect(): + self._change_state(True) + return True + return False + + async def _connect(self): + """Hardware specific connect method""" + # return True # if connection is successful, otherwise False + raise NotImplementedError() + + async def disconnect(self): + """Serve disconnect request. Triggers callbacks if state changes""" + if await self._disconnect(): + self._change_state(False) + return True + return False + + async def _disconnect(self): + """Hardware specific disconnect method""" + # return True # if disconnect is successful, otherwise False + raise NotImplementedError() + + async def reconnect(self): + """Serve reconnect request""" + return await self._reconnect() + + async def _reconnect(self): + """Hardware specific reconnect method""" + if await self._disconnect(): + return await self._connect() + return False + + def isconnected(self): + """"Checks if the interface is connected. Triggers callbacks if state changes""" + st = self._isconnected() + self._change_state(st) + return st + + def _isconnected(self): + """Hardware specific isconnected method""" + raise NotImplementedError() + + def _change_state(self, state): + """ + Private method executing all callbacks or creating asyncio tasks + on connection state changes + """ + st = self._state + if st != state: + self._state = state + if st is None and state is False: + # not triggering disconnect cbs when interface state was unknown + # (probably disconnected on startup) + return + for cb in self._subs: + launch(cb, (state,)) + + def subscribe(self, cb): + """Subscribe to interface connection state changes""" + self._subs.append(cb) diff --git a/mqtt_as/interfaces/linux.py b/mqtt_as/interfaces/linux.py new file mode 100644 index 0000000..e1b4d53 --- /dev/null +++ b/mqtt_as/interfaces/linux.py @@ -0,0 +1,12 @@ +from . import BaseInterface + + +class Linux(BaseInterface): + async def _disconnect(self): + return True # just to prevent errors we'll pretend to be disconnected. + + def _isconnected(self): + return True # always connected. + + async def _connect(self): + return True # always connected or can't do anything about it diff --git a/mqtt_as/interfaces/wlan/__init__.py b/mqtt_as/interfaces/wlan/__init__.py new file mode 100644 index 0000000..08955f5 --- /dev/null +++ b/mqtt_as/interfaces/wlan/__init__.py @@ -0,0 +1,11 @@ +from sys import platform + +if platform == "esp8266": + from .esp8266 import WLAN +elif platform == "esp32": + from .esp32 import WLAN +elif platform == "pyboard": + from .pyboard import WLAN +else: + # just try esp32 implementation. Seems most mature. + from .esp32 import WLAN diff --git a/mqtt_as/interfaces/wlan/esp32.py b/mqtt_as/interfaces/wlan/esp32.py new file mode 100644 index 0000000..4f7102d --- /dev/null +++ b/mqtt_as/interfaces/wlan/esp32.py @@ -0,0 +1,20 @@ +from .wlan_base import BaseWLAN +import network +import uasyncio as asyncio + + +class WLAN(BaseWLAN): + def __init__(self, ssid, wifi_pw): + super().__init__(ssid, wifi_pw) + # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942 + self.BUSY_ERRORS += [118, 119] # Add in weird ESP32 errors + + async def _connect(self): + s = self._sta_if + s.active(True) + if not s.isconnected(): + s.connect(self._ssid, self._wifi_pw) + while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + + return await self._check_reliability() diff --git a/mqtt_as/interfaces/wlan/esp8266.py b/mqtt_as/interfaces/wlan/esp8266.py new file mode 100644 index 0000000..4370d08 --- /dev/null +++ b/mqtt_as/interfaces/wlan/esp8266.py @@ -0,0 +1,30 @@ +from .wlan_base import BaseWLAN +import network +import uasyncio as asyncio + + +class WLAN(BaseWLAN): + def __init__(self, ssid=None, wifi_pw=None): + super().__init__(ssid, wifi_pw) + import esp + esp.sleep_type(0) # Improve connection integrity at cost of power consumption. + + async def _connect(self): + s = self._sta_if + if s.isconnected(): # 1st attempt, already connected. + return True + s.active(True) + s.connect() # ESP8266 remembers connection. + for _ in range(60): + if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. + break + await asyncio.sleep(1) + if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else + s.disconnect() + await asyncio.sleep(1) + if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None: + s.connect(self._ssid, self._wifi_pw) + while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + + return await self._check_reliability() diff --git a/mqtt_as/interfaces/wlan/pyboard.py b/mqtt_as/interfaces/wlan/pyboard.py new file mode 100644 index 0000000..d482ecc --- /dev/null +++ b/mqtt_as/interfaces/wlan/pyboard.py @@ -0,0 +1,17 @@ +from .wlan_base import BaseWLAN +import uasyncio as asyncio + + +class WLAN(BaseWLAN): + def __init__(self, ssid, wifi_pw): + super().__init__(ssid, wifi_pw) + + async def _connect(self): + s = self._sta_if + s.active(True) + s.connect(self._ssid, self._wifi_pw) + # Pyboard doesn't yet have STAT_CONNECTING constant + while s.status() in (1, 2): + await asyncio.sleep(1) + + return await self._check_reliability() diff --git a/mqtt_as/interfaces/wlan/wlan_base.py b/mqtt_as/interfaces/wlan/wlan_base.py new file mode 100644 index 0000000..2d0a5bc --- /dev/null +++ b/mqtt_as/interfaces/wlan/wlan_base.py @@ -0,0 +1,37 @@ +from .. import BaseInterface +import network +import uasyncio as asyncio + + +class BaseWLAN(BaseInterface): + def __init__(self, ssid=None, wifi_pw=None): + super().__init__() + self.DEBUG = False + self._ssid = ssid + self._wifi_pw = wifi_pw + # wifi credentials required for ESP32 / Pyboard D. Optional ESP8266 + self._sta_if = network.WLAN(network.STA_IF) + self._sta_if.active(True) + + async def _check_reliability(self): + s = self._sta_if + if not s.isconnected(): + return False + # Ensure connection stays up for a few secs. + if self.DEBUG: + print('Checking WiFi integrity.') + for _ in range(5): + if not s.isconnected(): + return False # in 1st 5 secs + await asyncio.sleep(1) + if self.DEBUG: + print('Got reliable connection') + return True + + async def _disconnect(self): + self._sta_if.disconnect() + await asyncio.sleep(1) + return True # not checking if really disconnected. + + def _isconnected(self): + return self._sta_if.isconnected() diff --git a/mqtt_as/mqtt_as.py b/mqtt_as/mqtt_as.py index cccb0eb..afda4f8 100644 --- a/mqtt_as/mqtt_as.py +++ b/mqtt_as/mqtt_as.py @@ -6,43 +6,31 @@ # Various improvements contributed by Kevin Köck. import gc -import usocket as socket +import sys import ustruct as struct +# imported here to optimize RAM usage +from .interfaces import BaseInterface + gc.collect() from ubinascii import hexlify import uasyncio as asyncio gc.collect() from utime import ticks_ms, ticks_diff -from uerrno import EINPROGRESS, ETIMEDOUT gc.collect() from micropython import const -from machine import unique_id -import network -gc.collect() -from sys import platform +if sys.platform != 'linux': + from machine import unique_id -VERSION = (0, 6, 1) +VERSION = (0, 7, 0) # Default short delay for good SynCom throughput (avoid sleep(0) with SynCom). _DEFAULT_MS = const(20) _SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency -# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection(). -if platform == 'esp32' or platform == 'esp32_LoBo': - # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942 - BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors -else: - BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] - -ESP8266 = platform == 'esp8266' -ESP32 = platform == 'esp32' -PYBOARD = platform == 'pyboard' -LOBO = platform == 'esp32_LoBo' - # Default "do little" coro for optional user replacement async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program @@ -50,7 +38,7 @@ async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program config = { - 'client_id': hexlify(unique_id()), + 'client_id': hexlify(unique_id()) if sys.platform != 'linux' else 'linux', 'server': None, 'port': 0, 'user': '', @@ -65,10 +53,11 @@ async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program 'max_repubs': 4, 'will': None, 'subs_cb': lambda *_: None, - 'wifi_coro': eliza, + 'wifi_coro': None, 'connect_coro': eliza, 'ssid': None, 'wifi_pw': None, + 'interface': None, } @@ -111,14 +100,23 @@ def __init__(self, config): self._lw_topic = False else: self._set_last_will(*will) - # WiFi config - self._ssid = config['ssid'] # Required for ESP32 / Pyboard D. Optional ESP8266 - self._wifi_pw = config['wifi_pw'] + # Interface config + if 'interface' not in config or config['interface'] is None: + if sys.platform == 'linux': + from .interfaces.linux import Linux + self._interface = Linux() + else: + # assume WLAN interface, backwards compatibility + from .interfaces.wlan import WLAN + self._interface = WLAN(config['ssid'], config['wifi_pw']) + else: + self._interface: BaseInterface = config['interface'] self._ssl = config['ssl'] self._ssl_params = config['ssl_params'] # Callbacks and coros self._cb = config['subs_cb'] - self._wifi_handler = config['wifi_coro'] + if config['wifi_coro']: + self._interface.subscribe(config['wifi_coro']) self._connect_handler = config['connect_coro'] # Network self.port = config['port'] @@ -128,8 +126,6 @@ def __init__(self, config): if self.server is None: raise ValueError('no server specified.') self._sock = None - self._sta_if = network.WLAN(network.STA_IF) - self._sta_if.active(True) self.newpid = pid_gen() self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response @@ -164,7 +160,7 @@ async def _as_read(self, n, sock=None): # OSError caught by superclass msg = sock.read(n - len(data)) except OSError as e: # ESP32 issues weird 119 errors here msg = None - if e.args[0] not in BUSY_ERRORS: + if e.args[0] not in self._interface.BUSY_ERRORS: raise if msg == b'': # Connection closed by host raise OSError(-1) @@ -188,7 +184,7 @@ async def _as_write(self, bytes_wr, length=0, sock=None): n = sock.write(bytes_wr) except OSError as e: # ESP32 issues weird 119 errors here n = 0 - if e.args[0] not in BUSY_ERRORS: + if e.args[0] not in self._interface.BUSY_ERRORS: raise if n: t = ticks_ms() @@ -211,12 +207,12 @@ async def _recv_len(self): sh += 7 async def _connect(self, clean): - self._sock = socket.socket() + self._sock = self._interface.socket.socket() self._sock.setblocking(False) try: self._sock.connect(self._addr) except OSError as e: - if e.args[0] not in BUSY_ERRORS: + if e.args[0] not in self._interface.BUSY_ERRORS: raise await asyncio.sleep_ms(_DEFAULT_MS) self.dprint('Connecting to broker.') @@ -271,7 +267,8 @@ async def wan_ok(self, if not self.isconnected(): # WiFi is down return False length = 32 # DNS query and response packet size - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s = self._interface.socket.socket(self._interface.socket.AF_INET, + self._interface.socket.SOCK_DGRAM) s.setblocking(False) s.connect(('8.8.8.8', 53)) await asyncio.sleep(1) @@ -458,61 +455,14 @@ def __init__(self, config): self._ping_interval = p_i self._in_connect = False self._has_connected = False # Define 'Clean Session' value to use. - if ESP8266: - import esp - esp.sleep_type(0) # Improve connection integrity at cost of power consumption. - - async def wifi_connect(self): - s = self._sta_if - if ESP8266: - if s.isconnected(): # 1st attempt, already connected. - return - s.active(True) - s.connect() # ESP8266 remembers connection. - for _ in range(60): - if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. - break - await asyncio.sleep(1) - if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else - s.disconnect() - await asyncio.sleep(1) - if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None: - s.connect(self._ssid, self._wifi_pw) - while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. - await asyncio.sleep(1) - else: - s.active(True) - s.connect(self._ssid, self._wifi_pw) - if PYBOARD: # Doesn't yet have STAT_CONNECTING constant - while s.status() in (1, 2): - await asyncio.sleep(1) - elif LOBO: - i = 0 - while not s.isconnected(): - await asyncio.sleep(1) - i += 1 - if i >= 10: - break - else: - while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec. - await asyncio.sleep(1) - - if not s.isconnected(): - raise OSError - # Ensure connection stays up for a few secs. - self.dprint('Checking WiFi integrity.') - for _ in range(5): - if not s.isconnected(): - raise OSError # in 1st 5 secs - await asyncio.sleep(1) - self.dprint('Got reliable connection') async def connect(self): if not self._has_connected: - await self.wifi_connect() # On 1st call, caller handles error + if not await self._interface.connect(): # On 1st call, caller handles error + raise OSError # Note this blocks if DNS lookup occurs. Do it once to prevent # blocking during later internet outage: - self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] + self._addr = self._interface.socket.getaddrinfo(self.server, self.port)[0][-1] self._in_connect = True # Disable low level ._isconnected check clean = self._clean if self._has_connected else self._clean_init try: @@ -524,7 +474,6 @@ async def connect(self): # If we get here without error broker/LAN must be up. self._isconnected = True self._in_connect = False # Low level code can now check connectivity. - asyncio.create_task(self._wifi_handler(True)) # User handler. if not self._has_connected: self._has_connected = True # Use normal clean flag on reconnect. asyncio.create_task( @@ -578,7 +527,7 @@ async def _memory(self): def isconnected(self): if self._in_connect: # Disable low-level check during .connect() return True - if self._isconnected and not self._sta_if.isconnected(): # It's going down. + if self._isconnected and not self._interface.isconnected(): # It's going down. self._reconnect() return self._isconnected @@ -586,7 +535,6 @@ def _reconnect(self): # Schedule a reconnection if not underway. if self._isconnected: self._isconnected = False self.close() - asyncio.create_task(self._wifi_handler(False)) # User handler. # Await broker connection. async def _connection(self): @@ -601,11 +549,7 @@ async def _keep_connected(self): await asyncio.sleep(1) gc.collect() else: - self._sta_if.disconnect() - await asyncio.sleep(1) - try: - await self.wifi_connect() - except OSError: + if not await self._interface.reconnect(): continue if not self._has_connected: # User has issued the terminal .disconnect() self.dprint('Disconnected, exiting _keep_connected') diff --git a/mqtt_as/clean.py b/tests/clean.py similarity index 100% rename from mqtt_as/clean.py rename to tests/clean.py diff --git a/mqtt_as/config.py b/tests/config.py similarity index 100% rename from mqtt_as/config.py rename to tests/config.py diff --git a/mqtt_as/lowpower.py b/tests/lowpower.py similarity index 100% rename from mqtt_as/lowpower.py rename to tests/lowpower.py diff --git a/mqtt_as/main.py b/tests/main.py similarity index 100% rename from mqtt_as/main.py rename to tests/main.py diff --git a/mqtt_as/pubtest b/tests/pubtest old mode 100755 new mode 100644 similarity index 100% rename from mqtt_as/pubtest rename to tests/pubtest diff --git a/mqtt_as/range.py b/tests/range.py similarity index 100% rename from mqtt_as/range.py rename to tests/range.py diff --git a/mqtt_as/range_ex.py b/tests/range_ex.py similarity index 100% rename from mqtt_as/range_ex.py rename to tests/range_ex.py diff --git a/mqtt_as/tls.py b/tests/tls.py similarity index 100% rename from mqtt_as/tls.py rename to tests/tls.py diff --git a/mqtt_as/tls32.py b/tests/tls32.py similarity index 100% rename from mqtt_as/tls32.py rename to tests/tls32.py diff --git a/mqtt_as/tls8266.py b/tests/tls8266.py similarity index 100% rename from mqtt_as/tls8266.py rename to tests/tls8266.py diff --git a/mqtt_as/unclean.py b/tests/unclean.py similarity index 100% rename from mqtt_as/unclean.py rename to tests/unclean.py