From fff728b05e186ec18aa0d2d79a04b3ab412eb6e3 Mon Sep 17 00:00:00 2001 From: Joseph Cosentino Date: Thu, 6 Jun 2024 22:10:46 -0700 Subject: [PATCH] add subscribe command (#11) --- README.md | 23 +++++++-- include/Command.h | 1 + include/Context.h | 15 +++++- include/Network.h | 2 +- include/Packet.h | 1 + src/Command.cpp | 45 +++++++++++++++++- src/Context.cpp | 49 ++++++++++++------- src/Packet.cpp | 119 +++++++++++++++++++++++++++++++++------------- src/main.cpp | 12 ++++- system-test.sh | 1 + 10 files changed, 209 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 823e721..f36b057 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,11 @@ Bare-bones MQTT CLI for Mac and Linux. This project is a personal learning exercise, for diving into MQTT internals and network programming. On the small chance that you're considering actually using this client....just use one of the real ones listed below. +## Publish to a topic +``` +mqtt pub -t hello/topic -a localhost:1883 -c clientId '{"hello": "world"}' +``` + ``` Usage: mqtt pub [-v] -t -a
-c Options: @@ -14,13 +19,23 @@ Options: -h, --help Show this help message ``` -## Publish to a topic +## Subscribe to a topic + +> :alert: Work in progress. Subscribe request is sent, but still need to actually listen for payloads. + ``` -mqtt pub -t hello/topic -a localhost:1883 -c clientId '{"hello": "world"}' +mqtt sub -t hello/topic -a localhost:1883 -c clientId ``` -## Subscribe to a topic -WIP +``` +Usage: mqtt sub [-v] -t -a
-c +Options: + -t, --topic Specify the topic to publish to (required) + -a, --address
[:] address to connect to (required) + -c, --client-id Client identifier + -v, --verbose Enable verbose output + -h, --help Show this help message +``` ## Features diff --git a/include/Command.h b/include/Command.h index 48e7c9a..c95bd9c 100644 --- a/include/Command.h +++ b/include/Command.h @@ -19,5 +19,6 @@ class Command { bool connect(); bool publish(); + bool subscribe(); }; } // namespace MqttClient \ No newline at end of file diff --git a/include/Context.h b/include/Context.h index 0045929..ee1ec27 100644 --- a/include/Context.h +++ b/include/Context.h @@ -23,7 +23,7 @@ struct ParseResult { ParseResult parseContext(int argc, char *argv[]) noexcept; -const std::string USAGE = R"( +const std::string USAGE_PUB = R"( Usage: mqtt pub [-v] -t -a
-c Options: -t, --topic Specify the topic to publish to (required) @@ -36,4 +36,17 @@ Usage: mqtt pub [-v] -t -a
-c mqtt pub -t hello/topic -v -a 127.0.0.1:8883 -c myclient '{"hello": "world"}' )"; +const std::string USAGE_SUB = R"( +Usage: mqtt sub [-v] -t -a
-c +Options: + -t, --topic Specify the topic to publish to (required) + -a, --address
[:] address to connect to (required) + -c, --client-id Client identifier + -v, --verbose Enable verbose output + -h, --help Show this help message + +Example: + mqtt sub -t hello/topic -v -a 127.0.0.1:8883 -c myclient +)"; + }; // namespace MqttClient \ No newline at end of file diff --git a/include/Network.h b/include/Network.h index b787444..948ee5e 100644 --- a/include/Network.h +++ b/include/Network.h @@ -5,7 +5,7 @@ namespace MqttClient { -using Payload = std::vector; +using Payload = std::vector; class Network { public: diff --git a/include/Packet.h b/include/Packet.h index fde5275..2ca4a1e 100644 --- a/include/Packet.h +++ b/include/Packet.h @@ -13,6 +13,7 @@ class PacketBuilder { PacketBuilder(std::shared_ptr context); bool connect(Payload &payload); // TODO more specific error handling? bool publish(Payload &payload); + bool subscribe(Payload &payload); private: std::shared_ptr mContext; diff --git a/src/Command.cpp b/src/Command.cpp index 9040212..5287cc2 100644 --- a/src/Command.cpp +++ b/src/Command.cpp @@ -29,7 +29,7 @@ void Command::execute() { if (mContext->command == "pub") { publish(); } else if (mContext->command == "sub") { - // TODO + subscribe(); } } @@ -81,9 +81,50 @@ bool Command::publish() { std::cout << "PUBLISH sent\n"; } + // TODO read PUBACK + return true; +} + +bool Command::subscribe() { + Payload subscribe; + if (!mPacketBuilder.subscribe(subscribe)) { + std::cerr << "Unable to build SUBSCRIBE packet\n"; + return false; + } + + if (!mNetwork.netSend(subscribe)) { + std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << "\n"; + return false; + } + + if (mContext->verbose) { + std::cout << "SUBSCRIBE sent\n"; + } + + Payload subAck; + if (!mNetwork.netRecv(subAck, 4)) { + std::cerr << "SUBACK not received\n"; + return false; + } + + unsigned char packetType = subAck[0]; + if (packetType != 0b10010000) { + std::cerr << "Another packet received instead of SUBACK\n"; + return false; + } + + if (subAck[3] > 2) { + int ret = subAck[3]; + std::cerr << "Subscription failed. return code: " << ret << "\n"; + return false; + } + + if (mContext->verbose) { + std::cout << "SUBACK sent\n"; + } - // PUBACK + return true; } } // namespace MqttClient diff --git a/src/Context.cpp b/src/Context.cpp index a5baf45..3dca551 100644 --- a/src/Context.cpp +++ b/src/Context.cpp @@ -35,7 +35,7 @@ ParseResult parseContext(int argc, char *argv[]) noexcept { context.verbose = true; break; case 'h': - return {ParseResultCode::HELP}; + return {ParseResultCode::HELP, {}, context}; case '?': return {ParseResultCode::FAILURE}; default: @@ -43,39 +43,54 @@ ParseResult parseContext(int argc, char *argv[]) noexcept { } } - if (context.command != "pub") { - return {ParseResultCode::FAILURE, - "Unknown command: " + context.command}; + if (context.command != "pub" && + context.command != "sub") { // TODO case-insensitive + return {ParseResultCode::FAILURE, "Unknown command: " + context.command, + context}; } if (context.topic.empty()) { // TODO more validations - return {ParseResultCode::FAILURE, "Error: Topic is required"}; + return {ParseResultCode::FAILURE, "Error: Topic is required", context}; } if (context.address.empty()) { // TODO more validations - return {ParseResultCode::FAILURE, "Error: Address is required"}; + return {ParseResultCode::FAILURE, "Error: Address is required", + context}; } if (context.clientId.empty()) { // TODO more validations - return {ParseResultCode::FAILURE, "Error: Client id is required"}; + return {ParseResultCode::FAILURE, "Error: Client id is required", + context}; + } + if (context.clientId.size() > 245) { + return {ParseResultCode::FAILURE, "Error: Client id is too long", + context}; } auto numMessages = argc - 1 - optind; - if (numMessages == 0) { - return {ParseResultCode::FAILURE, "Error: Message is required"}; - } - if (numMessages > 1) { - return {ParseResultCode::FAILURE, - "Error: " + std::to_string(numMessages) + - " messages were provided. Currently, only 1 message at a " - "time is supported"}; + if (context.command == "sub") { + if (numMessages != 0) { + return {ParseResultCode::FAILURE, + "Error: Message is not allowed for sub command", context}; + } + } else { + if (numMessages == 0) { + return {ParseResultCode::FAILURE, "Error: Message is required", + context}; + } + if (numMessages > 1) { + return {ParseResultCode::FAILURE, + "Error: " + std::to_string(numMessages) + + " messages were provided. Only 1 message at a " + "time is supported", + context}; + } + context.message = argv[optind + 1]; } - context.message = argv[optind + 1]; - return {ParseResultCode::SUCCESS, {}, context}; } diff --git a/src/Packet.cpp b/src/Packet.cpp index ae1fff7..fe8ee5b 100644 --- a/src/Packet.cpp +++ b/src/Packet.cpp @@ -6,62 +6,117 @@ namespace MqttClient { PacketBuilder::PacketBuilder(std::shared_ptr context) : mContext(std::move(context)) {}; +// TODO will topic, will message, user name, password, clean session +// TODO support MQTT5 bool PacketBuilder::connect(Payload &payload) { - char variableHeaderLength = 10; // TODO const - - // client id length needs to fit in the second byte of the connect - // packet (remainingLength) - if (mContext->clientId.size() > - 255 - variableHeaderLength) { // TODO can this just be moved to - // earlier validation? - return false; // TODO error handling - } - - // supported payload fields: client identifier - // TODO will topic, will message, user name, password, clean session - // TODO support MQTT5 - // BEGIN FIXED HEADER - payload.push_back(0b00010000); // MQTT control packet type (1) - payload.push_back(variableHeaderLength + 2 + - mContext->clientId.size()); // remainingLength + // ------------------ + // MQTT control packet type (1) + payload.push_back(0b00010000); + // remaining length + payload.push_back(10 + (2 + mContext->clientId.size())); + // BEGIN VARIABLE HEADER - payload.push_back(0); // MQTT length MSB - payload.push_back(4); // MQTT length LSB + // --------------------- + // MQTT length MSB + payload.push_back(0); + // MQTT length LSB + payload.push_back(4); payload.push_back('M'); payload.push_back('Q'); payload.push_back('T'); payload.push_back('T'); - payload.push_back(4); // protocol level (MQTT 3.1.1) - payload.push_back( - 0b00000010); // connect flags, just cleanSession=true for now - payload.push_back(0); // keep alive MSB - payload.push_back(0); // keep alive LSB + // protocol level (MQTT 3.1.1) + payload.push_back(4); + // connect flags + // cleanSession=true + // TODO support other flags + payload.push_back(0b00000010); + // keep alive MSB + // TODO + payload.push_back(0); + // keep alive LSB + // TODO + payload.push_back(0); + // BEGIN PAYLOAD + // ------------- + // client id length MSB + // TODO + payload.push_back(0); + // client id length LSB + payload.push_back(mContext->clientId.size()); // client id - payload.push_back(0); // TODO client id length MSB - payload.push_back(mContext->clientId.size()); // client id length LSB for (size_t i = 0; i < mContext->clientId.size(); ++i) { payload.push_back(mContext->clientId[i]); } + return true; } bool PacketBuilder::publish(Payload &payload) { // BEGIN FIXED HEADER - payload.push_back(0b00110000); // MQTT control packet type (3) TODO support - // dup , qos, retain - payload.push_back(2 + mContext->topic.size() + - mContext->message.size()); // remainingLength + // ------------------ + // MQTT control packet type (3) + // TODO support dup , qos, retain + payload.push_back(0b00110000); + // remaining length + payload.push_back((2 + mContext->topic.size()) + mContext->message.size()); + // BEGIN VARIABLE HEADER - payload.push_back(0); // TODO topic length MSB - payload.push_back(mContext->topic.size()); // topic length LSB + // --------------------- + // topic length MSB + // TODO + payload.push_back(0); + // topic length LSB + payload.push_back(mContext->topic.size()); + // topic for (size_t i = 0; i < mContext->topic.size(); ++i) { payload.push_back(mContext->topic[i]); } + + // BEGIN PAYLOAD + // ------------- for (size_t i = 0; i < mContext->message.size(); ++i) { payload.push_back(mContext->message[i]); } + + return true; +} + +bool PacketBuilder::subscribe(Payload &payload) { + // BEGIN FIXED HEADER + // ------------------ + // MQTT control packet type (8) + payload.push_back(0b10000010); + // remaining length + payload.push_back(2 + (3 + mContext->topic.size())); + + // BEGIN VARIABLE HEADER + // --------------------- + // packet identifier MSB + // TODO + payload.push_back(0); + // packet identifier LSB + // TODO support a real identifier + payload.push_back(1); + + // BEGIN PAYLOAD + // ------------- + // topic length MSB + // TODO + payload.push_back(0); + // topic length LSB + payload.push_back(mContext->topic.size()); + // topic + for (size_t i = 0; i < mContext->topic.size(); ++i) { + payload.push_back(mContext->topic[i]); + } + // topic QoS + // TODO + payload.push_back(0); + return true; } + } // namespace MqttClient \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 1452d92..66ff56d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -11,11 +11,19 @@ int main(int argc, char *argv[]) { auto res = parseContext(argc, argv); switch (res.code) { case ParseResultCode::HELP: - std::cout << USAGE; + if (res.context.command == "sub") { + std::cout << USAGE_SUB; + } else { + std::cout << USAGE_PUB; + } return 0; case ParseResultCode::FAILURE: std::cerr << res.error << "\n"; - std::cout << USAGE; + if (res.context.command == "sub") { + std::cout << USAGE_SUB; + } else { + std::cout << USAGE_PUB; + } return 1; case ParseResultCode::SUCCESS: // carry on diff --git a/system-test.sh b/system-test.sh index cd845c9..b0b5089 100755 --- a/system-test.sh +++ b/system-test.sh @@ -1,3 +1,4 @@ #!/usr/bin/env bash set -xe ./build/mqtt pub -t hello/topic -v -a broker.emqx.io:1883 -c client '{"hello": "world"}' +./build/mqtt sub -t hello/topic -v -a broker.emqx.io:1883 -c client