diff --git a/README.md b/README.md index 13b0223..8b63911 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # MQTT Client -WIP +Bare-bones MQTT CLI for Mac and Linux. ## Publish to a topic ``` @@ -11,3 +11,9 @@ mqtt pub -t hello/topic '{"hello": "world"}' ``` mqtt sub -t hello/topic ``` + +## Related Projects + +Here are some real-world MQTT clients to check out: + +* [MQTTX](https://mqttx.app/cli) diff --git a/include/Command.h b/include/Command.h new file mode 100644 index 0000000..9f92550 --- /dev/null +++ b/include/Command.h @@ -0,0 +1,22 @@ +#pragma once + +#include "Context.h" +#include "Network.h" +#include "Packet.h" +#include + +namespace MqttClient { + +class Command { + public: + Command(std::shared_ptr context); + void execute(); + + private: + std::shared_ptr mContext; + PacketBuilder mPacketBuilder; + Network mNetwork; + + bool connect(); +}; +} // namespace MqttClient \ No newline at end of file diff --git a/include/Network.h b/include/Network.h new file mode 100644 index 0000000..00f1c97 --- /dev/null +++ b/include/Network.h @@ -0,0 +1,27 @@ +#pragma once +#include "Context.h" +#include +#include + +namespace MqttClient { + +using Payload = std::vector; + +class Network { + public: + Network(std::shared_ptr context); + ~Network(); + + bool netSend(Payload &payload); + bool netRecv(Payload &payload, size_t len); + + private: + std::shared_ptr mContext; + int mSock; + bool mConnected = false; + + bool netConnect(); + void netClose(); +}; + +} // namespace MqttClient \ No newline at end of file diff --git a/include/Packet.h b/include/Packet.h new file mode 100644 index 0000000..7059f06 --- /dev/null +++ b/include/Packet.h @@ -0,0 +1,19 @@ +#pragma once + +#include "Context.h" +#include "Network.h" +#include +#include + +namespace MqttClient { + +class PacketBuilder { + + public: + PacketBuilder(std::shared_ptr context); + bool connect(Payload &payload); // TODO more specific error handling? + + private: + std::shared_ptr mContext; +}; +} // namespace MqttClient \ No newline at end of file diff --git a/src/Command.cpp b/src/Command.cpp new file mode 100644 index 0000000..83f5412 --- /dev/null +++ b/src/Command.cpp @@ -0,0 +1,70 @@ +#include "Command.h" +#include "Context.h" +#include "Packet.h" +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace MqttClient { + +Command::Command(std::shared_ptr context) + : mContext(std::move(context)), mPacketBuilder(mContext), + mNetwork(mContext) {} + +void Command::execute() { + if (mContext->verbose) { + std::cout << "Executing " << mContext->command << " command\n"; + } + + if (!connect()) { + return; + } + + if (mContext->command == "pub") { + // TODO + } + + if (mContext->command == "sub") { + // TODO + } +} + +bool Command::connect() { + Payload conn; + if (!mPacketBuilder.connect(conn)) { + std::cerr << "Unable to build CONNECT packet"; + return false; + } + + if (!mNetwork.netSend(conn)) { + std::cerr << "unable to send CONNECT: " << strerror(errno) << "\n"; + return false; + } + + if (mContext->verbose) { + std::cout << "CONNECT sent\n"; + } + + Payload connAck; + if (!mNetwork.netRecv(connAck, 4) || connAck[0] != 0b00100000) { + std::cerr << "CONNACK not received"; + return false; + } + + bool sessionPresent = connAck[2]; + int returnCode = connAck[3]; + + if (mContext->verbose) { + std::cout << "CONNACK received! return code: " << returnCode + << ", session present: " << sessionPresent << "\n"; + } + return true; +} + +} // namespace MqttClient diff --git a/src/Network.cpp b/src/Network.cpp new file mode 100644 index 0000000..939207b --- /dev/null +++ b/src/Network.cpp @@ -0,0 +1,100 @@ +#include "Network.h" +#include "Command.h" +#include "Context.h" +#include "Packet.h" +#include +#include +#include + +#include +#include +#include +#include + +namespace MqttClient { +Network::Network(std::shared_ptr context) + : mContext(std::move(context)) {}; +Network::~Network() { netClose(); } + +bool Network::netSend(Payload &payload) { + if (!netConnect()) { + return false; + } + if (send(mSock, payload.data(), payload.size(), 0) < 0) { + netClose(); + return false; + } + return true; +} + +bool Network::netRecv(Payload &payload, size_t len) { + if (!netConnect()) { + return false; + } + + payload.reserve(len); + + auto totalReceived = 0; + while (totalReceived < len) { + auto received = + recv(mSock, payload.data() + totalReceived, len - totalReceived, 0); + if (received == 0) { + netClose(); + return false; + } + totalReceived += received; + } + + return true; +} + +bool Network::netConnect() { + if (mConnected) { + return true; + } + + // TODO support ipv4 + int sock = socket(AF_INET6, SOCK_STREAM, 0); + if (sock < 0) { + std::cerr << "unable to create socket: " << strerror(errno) << "\n"; + return false; + } + mSock = sock; + + // TODO move this into context + std::string host = mContext->address; + std::string port = "1883"; + size_t pos = mContext->address.find_last_of(':'); + if (pos != std::string::npos) { + host = mContext->address.substr(0, pos); + port = mContext->address.substr(pos + 1); + } + + // TODO resolve localhost, hosts + sockaddr_in6 server_addr; + std::memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin6_family = AF_INET6; + server_addr.sin6_port = htons(std::stoi(port)); + if (inet_pton(AF_INET6, host.c_str(), &server_addr.sin6_addr) <= 0) { + netClose(); + std::cerr << "unable to create address: " << strerror(errno) << "\n"; + return false; + } + + if (connect(mSock, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) { + netClose(); + std::cerr << "unable to connect to server: " << strerror(errno) << "\n"; + return false; + } + + mConnected = true; + return true; +} + +void Network::netClose() { + if (mSock != 0) { + close(mSock); + mConnected = false; + } +} +} // namespace MqttClient \ No newline at end of file diff --git a/src/Packet.cpp b/src/Packet.cpp new file mode 100644 index 0000000..8e95241 --- /dev/null +++ b/src/Packet.cpp @@ -0,0 +1,49 @@ +#include "Packet.h" +#include "Context.h" +#include + +namespace MqttClient { +PacketBuilder::PacketBuilder(std::shared_ptr context) + : mContext(std::move(context)) {}; + +bool PacketBuilder::connect(Payload &payload) { + char variableHeaderLength = 10; // TODO const + + // client id length needs to fit in the second byte of the connect + // packet (remainingLength) + if (mContext->clientId.size() > + 255 - variableHeaderLength) { // TODO can this just be moved to + // earlier validation? + return false; // TODO error handling + } + + // supported payload fields: client identifier + // TODO will topic, will message, user name, password, clean session + // TODO support MQTT5 + + // BEGIN FIXED HEADER + payload.push_back(0b00010000); // MQTT control packet type (1) + payload.push_back(variableHeaderLength + 2 + + mContext->clientId.size()); // remainingLength + // BEGIN VARIABLE HEADER + payload.push_back(0); // MQTT length MSB + payload.push_back(4); // MQTT length LSB + payload.push_back('M'); + payload.push_back('Q'); + payload.push_back('T'); + payload.push_back('T'); + payload.push_back(4); // protocol level (MQTT 3.1.1) + payload.push_back( + 0b00000010); // connect flags, just cleanSession=true for now + payload.push_back(0); // keep alive MSB + payload.push_back(0); // keep alive LSB + // BEGIN PAYLOAD + // client id + payload.push_back(0); // TODO client id length MSB + payload.push_back(mContext->clientId.size()); // client id length LSB + for (size_t c = 0; c < mContext->clientId.size(); ++c) { + payload.push_back(mContext->clientId[c]); + } + return true; +} +} // namespace MqttClient \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 5839e08..1452d92 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,6 +1,8 @@ +#include "Command.h" #include "Context.h" #include #include +#include #include using namespace MqttClient; @@ -30,7 +32,8 @@ int main(int argc, char *argv[]) { << ", message=" << context.message << "]\n"; } - // TODO implement commands + auto command = new Command(std::make_shared(context)); + command->execute(); return 0; }