Skip to content

Commit

Permalink
add CONNECT and CONNACK (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 authored Jun 3, 2024
1 parent 974b204 commit 65b04cb
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 2 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# MQTT Client

WIP
Bare-bones MQTT CLI for Mac and Linux.

## Publish to a topic
```
Expand All @@ -11,3 +11,9 @@ mqtt pub -t hello/topic '{"hello": "world"}'
```
mqtt sub -t hello/topic
```

## Related Projects

Here are some real-world MQTT clients to check out:

* [MQTTX](https://mqttx.app/cli)
22 changes: 22 additions & 0 deletions include/Command.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include "Context.h"
#include "Network.h"
#include "Packet.h"
#include <memory>

namespace MqttClient {

class Command {
public:
Command(std::shared_ptr<Context> context);
void execute();

private:
std::shared_ptr<Context> mContext;
PacketBuilder mPacketBuilder;
Network mNetwork;

bool connect();
};
} // namespace MqttClient
27 changes: 27 additions & 0 deletions include/Network.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once
#include "Context.h"
#include <memory>
#include <vector>

namespace MqttClient {

using Payload = std::vector<char>;

class Network {
public:
Network(std::shared_ptr<Context> context);
~Network();

bool netSend(Payload &payload);
bool netRecv(Payload &payload, size_t len);

private:
std::shared_ptr<Context> mContext;
int mSock;
bool mConnected = false;

bool netConnect();
void netClose();
};

} // namespace MqttClient
19 changes: 19 additions & 0 deletions include/Packet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include "Context.h"
#include "Network.h"
#include <memory>
#include <vector>

namespace MqttClient {

class PacketBuilder {

public:
PacketBuilder(std::shared_ptr<Context> context);
bool connect(Payload &payload); // TODO more specific error handling?

private:
std::shared_ptr<Context> mContext;
};
} // namespace MqttClient
70 changes: 70 additions & 0 deletions src/Command.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include "Command.h"
#include "Context.h"
#include "Packet.h"
#include <cstring>
#include <iostream>
#include <memory>
#include <vector>

#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>

namespace MqttClient {

Command::Command(std::shared_ptr<Context> context)
: mContext(std::move(context)), mPacketBuilder(mContext),
mNetwork(mContext) {}

void Command::execute() {
if (mContext->verbose) {
std::cout << "Executing " << mContext->command << " command\n";
}

if (!connect()) {
return;
}

if (mContext->command == "pub") {
// TODO
}

if (mContext->command == "sub") {
// TODO
}
}

bool Command::connect() {
Payload conn;
if (!mPacketBuilder.connect(conn)) {
std::cerr << "Unable to build CONNECT packet";
return false;
}

if (!mNetwork.netSend(conn)) {
std::cerr << "unable to send CONNECT: " << strerror(errno) << "\n";
return false;
}

if (mContext->verbose) {
std::cout << "CONNECT sent\n";
}

Payload connAck;
if (!mNetwork.netRecv(connAck, 4) || connAck[0] != 0b00100000) {
std::cerr << "CONNACK not received";
return false;
}

bool sessionPresent = connAck[2];
int returnCode = connAck[3];

if (mContext->verbose) {
std::cout << "CONNACK received! return code: " << returnCode
<< ", session present: " << sessionPresent << "\n";
}
return true;
}

} // namespace MqttClient
100 changes: 100 additions & 0 deletions src/Network.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include "Network.h"
#include "Command.h"
#include "Context.h"
#include "Packet.h"
#include <cstring>
#include <iostream>
#include <vector>

#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>

namespace MqttClient {
Network::Network(std::shared_ptr<Context> context)
: mContext(std::move(context)) {};
Network::~Network() { netClose(); }

bool Network::netSend(Payload &payload) {
if (!netConnect()) {
return false;
}
if (send(mSock, payload.data(), payload.size(), 0) < 0) {
netClose();
return false;
}
return true;
}

bool Network::netRecv(Payload &payload, size_t len) {
if (!netConnect()) {
return false;
}

payload.reserve(len);

auto totalReceived = 0;
while (totalReceived < len) {
auto received =
recv(mSock, payload.data() + totalReceived, len - totalReceived, 0);
if (received == 0) {
netClose();
return false;
}
totalReceived += received;
}

return true;
}

bool Network::netConnect() {
if (mConnected) {
return true;
}

// TODO support ipv4
int sock = socket(AF_INET6, SOCK_STREAM, 0);
if (sock < 0) {
std::cerr << "unable to create socket: " << strerror(errno) << "\n";
return false;
}
mSock = sock;

// TODO move this into context
std::string host = mContext->address;
std::string port = "1883";
size_t pos = mContext->address.find_last_of(':');
if (pos != std::string::npos) {
host = mContext->address.substr(0, pos);
port = mContext->address.substr(pos + 1);
}

// TODO resolve localhost, hosts
sockaddr_in6 server_addr;
std::memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin6_family = AF_INET6;
server_addr.sin6_port = htons(std::stoi(port));
if (inet_pton(AF_INET6, host.c_str(), &server_addr.sin6_addr) <= 0) {
netClose();
std::cerr << "unable to create address: " << strerror(errno) << "\n";
return false;
}

if (connect(mSock, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
netClose();
std::cerr << "unable to connect to server: " << strerror(errno) << "\n";
return false;
}

mConnected = true;
return true;
}

void Network::netClose() {
if (mSock != 0) {
close(mSock);
mConnected = false;
}
}
} // namespace MqttClient
49 changes: 49 additions & 0 deletions src/Packet.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "Packet.h"
#include "Context.h"
#include <memory>

namespace MqttClient {
PacketBuilder::PacketBuilder(std::shared_ptr<Context> context)
: mContext(std::move(context)) {};

bool PacketBuilder::connect(Payload &payload) {
char variableHeaderLength = 10; // TODO const

// client id length needs to fit in the second byte of the connect
// packet (remainingLength)
if (mContext->clientId.size() >
255 - variableHeaderLength) { // TODO can this just be moved to
// earlier validation?
return false; // TODO error handling
}

// supported payload fields: client identifier
// TODO will topic, will message, user name, password, clean session
// TODO support MQTT5

// BEGIN FIXED HEADER
payload.push_back(0b00010000); // MQTT control packet type (1)
payload.push_back(variableHeaderLength + 2 +
mContext->clientId.size()); // remainingLength
// BEGIN VARIABLE HEADER
payload.push_back(0); // MQTT length MSB
payload.push_back(4); // MQTT length LSB
payload.push_back('M');
payload.push_back('Q');
payload.push_back('T');
payload.push_back('T');
payload.push_back(4); // protocol level (MQTT 3.1.1)
payload.push_back(
0b00000010); // connect flags, just cleanSession=true for now
payload.push_back(0); // keep alive MSB
payload.push_back(0); // keep alive LSB
// BEGIN PAYLOAD
// client id
payload.push_back(0); // TODO client id length MSB
payload.push_back(mContext->clientId.size()); // client id length LSB
for (size_t c = 0; c < mContext->clientId.size(); ++c) {
payload.push_back(mContext->clientId[c]);
}
return true;
}
} // namespace MqttClient
5 changes: 4 additions & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "Command.h"
#include "Context.h"
#include <getopt.h>
#include <iostream>
#include <memory>
#include <string>

using namespace MqttClient;
Expand Down Expand Up @@ -30,7 +32,8 @@ int main(int argc, char *argv[]) {
<< ", message=" << context.message << "]\n";
}

// TODO implement commands
auto command = new Command(std::make_shared<Context>(context));
command->execute();

return 0;
}

0 comments on commit 65b04cb

Please sign in to comment.