Skip to content

Commit fcd560c

Browse files
authored
Add support for MQTT 3.1.1 (#90)
* Add draft for an MQTT 3.1.1 message processor Since MQTT 3.1.1 is only an extension and clarification of MQTT 3.1, we can base the implementation of MQTT 3.1.1 on 3.1. * Improve MQTT 3.1.1 implementation and add tests for processor * Improve tests to cover relevant MQTT 3.1.1 features * Reduce log level when broker rejects subscription
1 parent ffaf8d8 commit fcd560c

File tree

6 files changed

+542
-13
lines changed

6 files changed

+542
-13
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PhpMqtt\Client\MessageProcessors;
6+
7+
use PhpMqtt\Client\Exceptions\InvalidMessageException;
8+
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
9+
use PhpMqtt\Client\Message;
10+
use PhpMqtt\Client\MessageType;
11+
12+
/**
13+
* This message processor implements the MQTT protocol version 3.1.1.
14+
*
15+
* @package PhpMqtt\Client\MessageProcessors
16+
*/
17+
class Mqtt311MessageProcessor extends Mqtt31MessageProcessor
18+
{
19+
/**
20+
* {@inheritDoc}
21+
*/
22+
protected function getEncodedProtocolNameAndVersion(): string
23+
{
24+
return $this->buildLengthPrefixedString('MQTT') . chr(0x04); // protocol version (4)
25+
}
26+
27+
/**
28+
* {@inheritDoc}
29+
*/
30+
public function parseAndValidateMessage(string $message): ?Message
31+
{
32+
$result = parent::parseAndValidateMessage($message);
33+
34+
if ($this->isPublishMessageWithNullCharacter($result)) {
35+
throw new ProtocolViolationException('The broker sent us a message with the forbidden unicode character U+0000.');
36+
}
37+
38+
return $result;
39+
}
40+
41+
/**
42+
* {@inheritDoc}
43+
*/
44+
protected function parseAndValidateSubscribeAcknowledgementMessage(string $data): Message
45+
{
46+
if (strlen($data) < 3) {
47+
$this->logger->notice('Received invalid subscribe acknowledgement from the broker.');
48+
throw new InvalidMessageException('Received invalid subscribe acknowledgement from the broker.');
49+
}
50+
51+
$messageId = $this->decodeMessageId($this->pop($data, 2));
52+
53+
// Parse and validate the QoS acknowledgements.
54+
$acknowledgements = array_map('ord', str_split($data));
55+
foreach ($acknowledgements as $acknowledgement) {
56+
if (!in_array($acknowledgement, [0, 1, 2, 128])) {
57+
throw new InvalidMessageException('Received subscribe acknowledgement with invalid QoS values from the broker.');
58+
}
59+
}
60+
61+
return (new Message(MessageType::SUBSCRIBE_ACKNOWLEDGEMENT()))
62+
->setMessageId($messageId)
63+
->setAcknowledgedQualityOfServices($acknowledgements);
64+
}
65+
66+
/**
67+
* Determines if the given message is a PUBLISH message and contains the unicode null character U+0000.
68+
*
69+
* @param Message $message
70+
* @return bool
71+
*/
72+
private function isPublishMessageWithNullCharacter(Message $message): bool
73+
{
74+
return $message !== null
75+
&& $message->getType()->equals(MessageType::PUBLISH())
76+
&& $message->getContent() !== null
77+
&& preg_match('/\x{0000}/u', $message->getContent());
78+
}
79+
}

src/MessageProcessors/Mqtt31MessageProcessor.php

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ public function tryFindMessageInBuffer(string $buffer, int $bufferLength, string
9797
public function buildConnectMessage(ConnectionSettings $connectionSettings, bool $useCleanSession = false): string
9898
{
9999
// The protocol name and version.
100-
$buffer = $this->buildLengthPrefixedString('MQIsdp');
101-
$buffer .= chr(0x03); // protocol version (3)
100+
$buffer = $this->getEncodedProtocolNameAndVersion();
102101

103102
// Build connection flags based on the connection settings.
104103
$buffer .= chr($this->buildConnectionFlags($connectionSettings, $useCleanSession));
@@ -130,6 +129,16 @@ public function buildConnectMessage(ConnectionSettings $connectionSettings, bool
130129
return $header . $buffer;
131130
}
132131

132+
/**
133+
* Returns the encoded protocol name and version, ready to be sent as part of the CONNECT message.
134+
*
135+
* @return string
136+
*/
137+
protected function getEncodedProtocolNameAndVersion(): string
138+
{
139+
return $this->buildLengthPrefixedString('MQIsdp') . chr(0x03); // protocol version (3)
140+
}
141+
133142
/**
134143
* Builds the connection flags from the inputs and settings.
135144
*
@@ -362,7 +371,7 @@ public function buildPublishMessage(
362371
if ($qualityOfService > self::QOS_AT_MOST_ONCE) {
363372
$command += $qualityOfService << 1;
364373
}
365-
if ($isDuplicate) {
374+
if ($qualityOfService > self::QOS_AT_MOST_ONCE && $isDuplicate) {
366375
$command += 1 << 3;
367376
}
368377

@@ -462,7 +471,7 @@ public function parseAndValidateMessage(string $message): ?Message
462471
break;
463472
}
464473

465-
// If we arrive here, we must have parsed a message with an unsupported type and it cannot be
474+
// If we arrive here, we must have parsed a message with an unsupported type, and it cannot be
466475
// very relevant for us. So we return an empty result without information to skip processing.
467476
return null;
468477
}

src/MqttClient.php

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException;
2020
use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException;
2121
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
22+
use PhpMqtt\Client\MessageProcessors\Mqtt311MessageProcessor;
2223
use PhpMqtt\Client\MessageProcessors\Mqtt31MessageProcessor;
2324
use PhpMqtt\Client\Repositories\MemoryRepository;
2425
use Psr\Log\LoggerInterface;
@@ -34,7 +35,8 @@ class MqttClient implements ClientContract
3435
OffersHooks,
3536
ValidatesConfiguration;
3637

37-
const MQTT_3_1 = '3.1';
38+
const MQTT_3_1 = '3.1';
39+
const MQTT_3_1_1 = '3.1.1';
3840

3941
const QOS_AT_MOST_ONCE = 0;
4042
const QOS_AT_LEAST_ONCE = 1;
@@ -84,7 +86,7 @@ public function __construct(
8486
LoggerInterface $logger = null
8587
)
8688
{
87-
if (!in_array($protocol, [self::MQTT_3_1])) {
89+
if (!in_array($protocol, [self::MQTT_3_1, self::MQTT_3_1_1])) {
8890
throw new ProtocolNotSupportedException($protocol);
8991
}
9092

@@ -95,9 +97,14 @@ public function __construct(
9597
$this->logger = new Logger($this->host, $this->port, $this->clientId, $logger);
9698

9799
switch ($protocol) {
100+
case self::MQTT_3_1_1:
101+
$this->messageProcessor = new Mqtt311MessageProcessor($this->clientId, $this->logger);
102+
break;
103+
98104
case self::MQTT_3_1:
99105
default:
100106
$this->messageProcessor = new Mqtt31MessageProcessor($this->clientId, $this->logger);
107+
break;
101108
}
102109

103110
$this->initializeEventHandlers();
@@ -798,6 +805,15 @@ protected function handleMessage(Message $message): void
798805
}
799806

800807
foreach ($message->getAcknowledgedQualityOfServices() as $index => $qualityOfService) {
808+
// Starting from MQTT 3.1.1, the broker is able to reject individual subscriptions.
809+
// Instead of failing the whole bulk, we log the incident and skip the single subscription.
810+
if ($qualityOfService === 128) {
811+
$this->logger->notice('The broker rejected the subscription to [{topicFilter}].', [
812+
'topicFilter' => $acknowledgedSubscriptions[$index]->getTopicFilter(),
813+
]);
814+
continue;
815+
}
816+
801817
// It may happen that the server registers our subscription
802818
// with a lower quality of service than requested, in this
803819
// case this is the one that we will record.

tests/Feature/ConnectWithCustomConnectionSettingsTest.php

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,41 @@
1717
*/
1818
class ConnectWithCustomConnectionSettingsTest extends TestCase
1919
{
20-
public function test_connecting_with_custom_connection_settings_works_as_intended(): void
20+
public function test_connecting_using_mqtt31_with_custom_connection_settings_works_as_intended(): void
2121
{
22-
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-custom-connection-settings');
22+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-custom-connection-settings', MqttClient::MQTT_3_1);
23+
24+
$connectionSettings = (new ConnectionSettings)
25+
->setLastWillTopic('foo/last/will')
26+
->setLastWillMessage('baz is out!')
27+
->setLastWillQualityOfService(MqttClient::QOS_AT_MOST_ONCE)
28+
->setRetainLastWill(true)
29+
->setConnectTimeout(3)
30+
->setSocketTimeout(3)
31+
->setResendTimeout(3)
32+
->setKeepAliveInterval(30)
33+
->setUsername(null)
34+
->setPassword(null)
35+
->setUseTls(false)
36+
->setTlsCertificateAuthorityFile(null)
37+
->setTlsCertificateAuthorityPath(null)
38+
->setTlsClientCertificateFile(null)
39+
->setTlsClientCertificateKeyFile(null)
40+
->setTlsClientCertificateKeyPassphrase(null)
41+
->setTlsVerifyPeer(false)
42+
->setTlsVerifyPeerName(false)
43+
->setTlsSelfSignedAllowed(true);
44+
45+
$client->connect($connectionSettings);
46+
47+
$this->assertTrue($client->isConnected());
48+
49+
$client->disconnect();
50+
}
51+
52+
public function test_connecting_using_mqtt311_with_custom_connection_settings_works_as_intended(): void
53+
{
54+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-custom-connection-settings', MqttClient::MQTT_3_1_1);
2355

2456
$connectionSettings = (new ConnectionSettings)
2557
->setLastWillTopic('foo/last/will')

tests/Feature/SupportedProtocolsTest.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ public function test_client_supports_mqtt_3_1_protocol(): void
2424
$this->assertInstanceOf(MqttClient::class, $client);
2525
}
2626

27-
public function test_client_does_not_support_mqtt_3_protocol(): void
27+
public function test_client_supports_mqtt_3_1_1_protocol(): void
2828
{
29-
$this->expectException(ProtocolNotSupportedException::class);
29+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-protocol', MqttClient::MQTT_3_1_1);
3030

31-
new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-protocol', '3');
31+
$this->assertInstanceOf(MqttClient::class, $client);
3232
}
3333

34-
public function test_client_does_not_support_mqtt_3_1_1_protocol(): void
34+
public function test_client_does_not_support_mqtt_3_protocol(): void
3535
{
3636
$this->expectException(ProtocolNotSupportedException::class);
3737

38-
new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-protocol', '3.1.1');
38+
new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-protocol', '3');
3939
}
4040

4141
public function test_client_does_not_support_mqtt_5_protocol(): void

0 commit comments

Comments
 (0)