From 8e3ea61c3f7c8acac8219b8ae7bc280686a5637a Mon Sep 17 00:00:00 2001 From: Joseph Cosentino Date: Fri, 7 Jun 2024 01:39:51 -0700 Subject: [PATCH] working pub sub (#13) --- .gitignore | 1 + README.md | 7 +-- include/Context.h | 1 + include/Network.h | 2 +- include/Packet.h | 3 +- src/Command.cpp | 146 +++++++++++++++++++++++++++++++++++++++------- src/Context.cpp | 2 + src/Network.cpp | 9 +-- src/Packet.cpp | 36 +++++++++--- src/main.cpp | 8 +-- system-test.sh | 24 +++++++- 11 files changed, 193 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index 5acb669..43c76f4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ build .vscode +system-test-output diff --git a/README.md b/README.md index c437655..9021945 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,6 @@ Options: ## Subscribe to a topic -> :warning: Work in progress. Subscribe request is sent, but still need to actually listen for payloads. - ``` mqtt sub -t hello/topic -a localhost:1883 -c clientId ``` @@ -44,9 +42,10 @@ Options: | MQTT3 | x | | | MQTT5 | | x | | QOS0 | x | | -| QOS1/2 | | x | +| QOS1 | x | | +| QOS2 | | x | | PUBLISH | x | | -| SUBSCRIBE | | x | +| SUBSCRIBE | x | | | Various CONNECT options | | x | | SSL | | x | diff --git a/include/Context.h b/include/Context.h index ee1ec27..bbc686d 100644 --- a/include/Context.h +++ b/include/Context.h @@ -10,6 +10,7 @@ struct Context { std::string message; std::string address; std::string clientId; + uint8_t qos; // TODO make configurable bool verbose; }; diff --git a/include/Network.h b/include/Network.h index 948ee5e..c1f1726 100644 --- a/include/Network.h +++ b/include/Network.h @@ -5,7 +5,7 @@ namespace MqttClient { -using Payload = std::vector; +using Payload = std::vector; class Network { public: diff --git a/include/Packet.h b/include/Packet.h index 2ca4a1e..10513f0 100644 --- a/include/Packet.h +++ b/include/Packet.h @@ -11,11 +11,12 @@ class PacketBuilder { public: PacketBuilder(std::shared_ptr context); - bool connect(Payload &payload); // TODO more specific error handling? + bool connect(Payload &payload); bool publish(Payload &payload); bool subscribe(Payload &payload); private: std::shared_ptr mContext; + uint16_t packetId(); }; } // namespace MqttClient \ No newline at end of file diff --git a/src/Command.cpp b/src/Command.cpp index 5287cc2..c51b1a9 100644 --- a/src/Command.cpp +++ b/src/Command.cpp @@ -1,5 +1,6 @@ #include "Command.h" #include "Context.h" +#include "Interrupt.h" #include "Packet.h" #include #include @@ -19,7 +20,8 @@ Command::Command(std::shared_ptr context) void Command::execute() { if (mContext->verbose) { - std::cout << "Executing " << mContext->command << " command\n"; + std::cout << "Executing " << mContext->command << " command" + << std::endl; } if (!connect()) { @@ -36,22 +38,23 @@ void Command::execute() { bool Command::connect() { Payload conn; if (!mPacketBuilder.connect(conn)) { - std::cerr << "Unable to build CONNECT packet\n"; + std::cerr << "Unable to build CONNECT packet" << std::endl; return false; } if (!mNetwork.netSend(conn)) { - std::cerr << "Unable to send CONNECT: " << strerror(errno) << "\n"; + std::cerr << "Unable to send CONNECT: " << strerror(errno) << "" + << std::endl; return false; } if (mContext->verbose) { - std::cout << "CONNECT sent\n"; + std::cout << "CONNECT sent" << std::endl; } Payload connAck; if (!mNetwork.netRecv(connAck, 4) || connAck[0] != 0b00100000) { - std::cerr << "CONNACK not received\n"; + std::cerr << "CONNACK not received" << std::endl; return false; } @@ -60,7 +63,7 @@ bool Command::connect() { if (mContext->verbose) { std::cout << "CONNACK received! return code: " << returnCode - << ", session present: " << sessionPresent << "\n"; + << ", session present: " << sessionPresent << "" << std::endl; } return true; } @@ -68,60 +71,159 @@ bool Command::connect() { bool Command::publish() { Payload publish; if (!mPacketBuilder.publish(publish)) { - std::cerr << "Unable to build PUBLISH packet\n"; + std::cerr << "Unable to build PUBLISH packet" << std::endl; return false; } if (!mNetwork.netSend(publish)) { - std::cerr << "Unable to send PUBLISH: " << strerror(errno) << "\n"; + std::cerr << "Unable to send PUBLISH: " << strerror(errno) << "" + << std::endl; return false; } if (mContext->verbose) { - std::cout << "PUBLISH sent\n"; + std::cout << "PUBLISH sent" << std::endl; + } + + // TODO support QoS 2 + if (mContext->qos == 1) { + Payload pubAck; + if (!mNetwork.netRecv(pubAck, 4)) { + std::cerr << "PUBACK not received" << std::endl; + return false; + } + if (pubAck[0] != 0b01000000) { + std::cerr << "Another packet received instead of PUBACK" + << std::endl; + return false; + } + if (mContext->verbose) { + std::cout << "PUBACK received" << std::endl; + } } - // TODO read PUBACK - return true; } bool Command::subscribe() { Payload subscribe; if (!mPacketBuilder.subscribe(subscribe)) { - std::cerr << "Unable to build SUBSCRIBE packet\n"; + std::cerr << "Unable to build SUBSCRIBE packet" << std::endl; return false; } if (!mNetwork.netSend(subscribe)) { - std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << "\n"; + std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << "" + << std::endl; return false; } if (mContext->verbose) { - std::cout << "SUBSCRIBE sent\n"; + std::cout << "SUBSCRIBE sent" << std::endl; } Payload subAck; - if (!mNetwork.netRecv(subAck, 4)) { - std::cerr << "SUBACK not received\n"; + if (!mNetwork.netRecv(subAck, 5)) { + std::cerr << "SUBACK not received" << std::endl; return false; } - unsigned char packetType = subAck[0]; + auto packetType = subAck[0]; if (packetType != 0b10010000) { - std::cerr << "Another packet received instead of SUBACK\n"; + std::cerr << "Another packet received instead of SUBACK" << std::endl; return false; } - if (subAck[3] > 2) { - int ret = subAck[3]; - std::cerr << "Subscription failed. return code: " << ret << "\n"; + if (subAck[4] != mContext->qos) { + std::cerr << "Subscription failed" << std::endl; return false; } if (mContext->verbose) { - std::cout << "SUBACK sent\n"; + std::cout << "SUBACK received" << std::endl; + } + + // receive PUBLISH messages until interrupted + while (!interrupted.load()) { + uint8_t qos; + { + Payload packet; + if (!mNetwork.netRecv(packet, 1)) { + if (mContext->verbose) { + std::cerr << "Unable to receive PUBLISH packet" + << std::endl; + } + continue; + } + if (interrupted.load()) { + return true; + } + packetType = packet[0] & 0b11110000; + qos = packet[0] & 0b00000110; + + if (packetType != 0b00110000) { + if (mContext->verbose) { + std::cerr << "Unexpected packet type received"; + } + continue; + } + } + + uint8_t remainingLen; + { + Payload packet; + if (!mNetwork.netRecv(packet, 1)) { + if (mContext->verbose) { + std::cerr << "Unable to receive remaining length" + << std::endl; + } + continue; + } + if (interrupted.load()) { + return true; + } + remainingLen = packet[0]; + } + + std::string message; + { + Payload packet; + if (!mNetwork.netRecv(packet, remainingLen)) { + if (mContext->verbose) { + std::cerr << "Unable to receive rest of PUBLISH packet" + << std::endl; + } + continue; + } + if (interrupted.load()) { + return true; + } + + int messageStart = 0; + + uint16_t topicLen = packet[1] + (packet[0] << 8); + messageStart += 2; + messageStart += topicLen; + + if (qos > 0) { + messageStart += 2; // packet id + } + + if (messageStart == packet.size()) { + message = ""; + } else if (messageStart < packet.size()) { + message = + std::string(packet.begin() + messageStart, packet.end()); + } else { + if (mContext->verbose) { + std::cerr << "Not enough buffer for message payload" + << std::endl; + } + continue; + } + } + + std::cout << message << std::endl; } return true; diff --git a/src/Context.cpp b/src/Context.cpp index 3dca551..23b3e41 100644 --- a/src/Context.cpp +++ b/src/Context.cpp @@ -91,6 +91,8 @@ ParseResult parseContext(int argc, char *argv[]) noexcept { context.message = argv[optind + 1]; } + context.qos = 1; // TODO make configurable + return {ParseResultCode::SUCCESS, {}, context}; } diff --git a/src/Network.cpp b/src/Network.cpp index 157d1df..555d403 100644 --- a/src/Network.cpp +++ b/src/Network.cpp @@ -40,7 +40,7 @@ bool Network::netRecv(Payload &payload, size_t len) { while (totalReceived < len) { auto received = recv(mSock, payload.data() + totalReceived, len - totalReceived, 0); - if (received == 0) { + if (received <= 0) { netClose(); return false; } @@ -70,7 +70,8 @@ bool Network::netConnect() { hints.ai_protocol = IPPROTO_TCP; if (getaddrinfo(host.data(), port.data(), &hints, &addrs) != 0) { - std::cerr << "unable to resolve address: " << strerror(errno) << "\n"; + std::cerr << "unable to resolve address: " << strerror(errno) << "" + << std::endl; return false; } @@ -81,8 +82,8 @@ bool Network::netConnect() { } if (connect(mSock, addr->ai_addr, addr->ai_addrlen) < 0) { - std::cerr << "unable to connect to host: " << strerror(errno) - << "\n"; + std::cerr << "unable to connect to host: " << strerror(errno) << "" + << std::endl; freeaddrinfo(addrs); netClose(); return false; diff --git a/src/Packet.cpp b/src/Packet.cpp index fe8ee5b..5b5af5e 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -1,6 +1,7 @@ #include "Packet.h" #include "Context.h" #include +#include namespace MqttClient { PacketBuilder::PacketBuilder(std::shared_ptr context) @@ -58,10 +59,18 @@ bool PacketBuilder::publish(Payload &payload) { // BEGIN FIXED HEADER // ------------------ // MQTT control packet type (3) - // TODO support dup , qos, retain - payload.push_back(0b00110000); + // TODO support dup, retain + auto controlPacket = 0b00110000; + // set qos + controlPacket = controlPacket | mContext->qos << 1; + payload.push_back(controlPacket); // remaining length - payload.push_back((2 + mContext->topic.size()) + mContext->message.size()); + uint8_t remainingLength = + (2 + mContext->topic.size()) + mContext->message.size(); + if (mContext->qos > 0) { + remainingLength += 2; // packet identifier + } + payload.push_back(remainingLength); // BEGIN VARIABLE HEADER // --------------------- @@ -74,6 +83,13 @@ bool PacketBuilder::publish(Payload &payload) { for (size_t i = 0; i < mContext->topic.size(); ++i) { payload.push_back(mContext->topic[i]); } + if (mContext->qos > 0) { + // packet identifier MSB + // TODO + payload.push_back(0); + // packet identifier LSB + payload.push_back(packetId()); + } // BEGIN PAYLOAD // ------------- @@ -98,8 +114,7 @@ bool PacketBuilder::subscribe(Payload &payload) { // TODO payload.push_back(0); // packet identifier LSB - // TODO support a real identifier - payload.push_back(1); + payload.push_back(packetId()); // BEGIN PAYLOAD // ------------- @@ -113,10 +128,17 @@ bool PacketBuilder::subscribe(Payload &payload) { payload.push_back(mContext->topic[i]); } // topic QoS - // TODO - payload.push_back(0); + payload.push_back(mContext->qos); return true; } +uint16_t PacketBuilder::packetId() { + // TODO rework this + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distribution(200, 500); + return distribution(gen); +} + } // namespace MqttClient \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 03952eb..e54c363 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -11,11 +11,11 @@ using namespace MqttClient; int main(int argc, char *argv[]) { if (signal(SIGINT, onInterrupt) == SIG_ERR) { - std::cerr << "Unable to register SIGINT handler\n"; + std::cerr << "Unable to register SIGINT handler" << std::endl; return 1; } if (signal(SIGTERM, onInterrupt) == SIG_ERR) { - std::cerr << "Unable to register SIGTERM handler\n"; + std::cerr << "Unable to register SIGTERM handler" << std::endl; return 1; } @@ -29,7 +29,7 @@ int main(int argc, char *argv[]) { } return 0; case ParseResultCode::FAILURE: - std::cerr << res.error << "\n"; + std::cerr << res.error << "" << std::endl; if (res.context.command == "sub") { std::cout << USAGE_SUB; } else { @@ -48,7 +48,7 @@ int main(int argc, char *argv[]) { << ", topic=" << context.topic << ", address=" << context.address << ", clientId=" << context.clientId - << ", message=" << context.message << "]\n"; + << ", message=" << context.message << "]" << std::endl; } auto command = new Command(std::make_shared(context)); diff --git a/system-test.sh b/system-test.sh index b0b5089..57d42db 100755 --- a/system-test.sh +++ b/system-test.sh @@ -1,4 +1,22 @@ #!/usr/bin/env bash -set -xe -./build/mqtt pub -t hello/topic -v -a broker.emqx.io:1883 -c client '{"hello": "world"}' -./build/mqtt sub -t hello/topic -v -a broker.emqx.io:1883 -c client +set -e + +# given a subscription +./build/mqtt sub -v -t hello/topic -a broker.emqx.io:1883 -c mqttclient1 &> system-test-output & +PID=$! +sleep 1 + +# publish a message +./build/mqtt pub -v -t hello/topic -a broker.emqx.io:1883 -c mqttclient2 '{"hello": "world"}' +sleep 1 + +# then verify the subscription received the message +if grep -q '{"hello": "world"}' system-test-output; then + echo "Message received successfully" + kill -9 $PID &> /dev/null # TODO SIGINT, after making recv play nicer with interrupts + exit 0 +else + echo "Message not received" + kill -9 $PID + exit 1 +fi