Skip to content

Commit

Permalink
working pub sub
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Jun 7, 2024
1 parent caadf6f commit 28a560b
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build
.vscode
system-test-output
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ Options:

## Subscribe to a topic

> :warning: Work in progress. Subscribe request is sent, but still need to actually listen for payloads.
```
mqtt sub -t hello/topic -a localhost:1883 -c clientId
```
Expand All @@ -44,9 +42,10 @@ Options:
| MQTT3 | x | |
| MQTT5 | | x |
| QOS0 | x | |
| QOS1/2 | | x |
| QOS1 | x | |
| QOS2 | | x |
| PUBLISH | x | |
| SUBSCRIBE | | x |
| SUBSCRIBE | x | |
| Various CONNECT options | | x |
| SSL | | x |

Expand Down
1 change: 1 addition & 0 deletions include/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct Context {
std::string message;
std::string address;
std::string clientId;
uint8_t qos; // TODO make configurable
bool verbose;
};

Expand Down
2 changes: 1 addition & 1 deletion include/Network.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MqttClient {

using Payload = std::vector<unsigned char>;
using Payload = std::vector<uint8_t>;

class Network {
public:
Expand Down
3 changes: 2 additions & 1 deletion include/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ class PacketBuilder {

public:
PacketBuilder(std::shared_ptr<Context> context);
bool connect(Payload &payload); // TODO more specific error handling?
bool connect(Payload &payload);
bool publish(Payload &payload);
bool subscribe(Payload &payload);

private:
std::shared_ptr<Context> mContext;
uint16_t packetId();
};
} // namespace MqttClient
147 changes: 125 additions & 22 deletions src/Command.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "Command.h"
#include "Context.h"
#include "Interrupt.h"
#include "Packet.h"
#include <cstring>
#include <iostream>
Expand All @@ -19,7 +20,8 @@ Command::Command(std::shared_ptr<Context> context)

void Command::execute() {
if (mContext->verbose) {
std::cout << "Executing " << mContext->command << " command\n";
std::cout << "Executing " << mContext->command << " command"
<< std::endl;
}

if (!connect()) {
Expand All @@ -36,22 +38,23 @@ void Command::execute() {
bool Command::connect() {
Payload conn;
if (!mPacketBuilder.connect(conn)) {
std::cerr << "Unable to build CONNECT packet\n";
std::cerr << "Unable to build CONNECT packet" << std::endl;
return false;
}

if (!mNetwork.netSend(conn)) {
std::cerr << "Unable to send CONNECT: " << strerror(errno) << "\n";
std::cerr << "Unable to send CONNECT: " << strerror(errno) << ""
<< std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "CONNECT sent\n";
std::cout << "CONNECT sent" << std::endl;
}

Payload connAck;
if (!mNetwork.netRecv(connAck, 4) || connAck[0] != 0b00100000) {
std::cerr << "CONNACK not received\n";
std::cerr << "CONNACK not received" << std::endl;
return false;
}

Expand All @@ -60,68 +63,168 @@ bool Command::connect() {

if (mContext->verbose) {
std::cout << "CONNACK received! return code: " << returnCode
<< ", session present: " << sessionPresent << "\n";
<< ", session present: " << sessionPresent << "" << std::endl;
}
return true;
}

bool Command::publish() {
Payload publish;
if (!mPacketBuilder.publish(publish)) {
std::cerr << "Unable to build PUBLISH packet\n";
std::cerr << "Unable to build PUBLISH packet" << std::endl;
return false;
}

if (!mNetwork.netSend(publish)) {
std::cerr << "Unable to send PUBLISH: " << strerror(errno) << "\n";
std::cerr << "Unable to send PUBLISH: " << strerror(errno) << ""
<< std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "PUBLISH sent\n";
std::cout << "PUBLISH sent" << std::endl;
}

// TODO support QoS 2
if (mContext->qos == 1) {
Payload pubAck;
if (!mNetwork.netRecv(pubAck, 4)) {
std::cerr << "PUBACK not received" << std::endl;
return false;
}
if (pubAck[0] != 0b01000000) {
std::cerr << "Another packet received instead of PUBACK"
<< std::endl;
return false;
}
if (mContext->verbose) {
std::cout << "PUBACK received" << std::endl;
}
}

// TODO read PUBACK

return true;
}

bool Command::subscribe() {
Payload subscribe;
if (!mPacketBuilder.subscribe(subscribe)) {
std::cerr << "Unable to build SUBSCRIBE packet\n";
std::cerr << "Unable to build SUBSCRIBE packet" << std::endl;
return false;
}

if (!mNetwork.netSend(subscribe)) {
std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << "\n";
std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << ""
<< std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "SUBSCRIBE sent\n";
std::cout << "SUBSCRIBE sent" << std::endl;
}

Payload subAck;
if (!mNetwork.netRecv(subAck, 4)) {
std::cerr << "SUBACK not received\n";
if (!mNetwork.netRecv(subAck, 5)) {
std::cerr << "SUBACK not received" << std::endl;
return false;
}

unsigned char packetType = subAck[0];
auto packetType = subAck[0];
if (packetType != 0b10010000) {
std::cerr << "Another packet received instead of SUBACK\n";
std::cerr << "Another packet received instead of SUBACK" << std::endl;
return false;
}

if (subAck[3] > 2) {
int ret = subAck[3];
std::cerr << "Subscription failed. return code: " << ret << "\n";
if (subAck[4] != mContext->qos) {
std::cerr << "Subscription failed" << std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "SUBACK sent\n";
std::cout << "SUBACK received" << std::endl;
}

// receive PUBLISH messages until interrupted
while (!interrupted.load()) {
uint8_t qos;
{
Payload packet;
if (!mNetwork.netRecv(packet, 1)) {
if (mContext->verbose) {
std::cerr << "Unable to receive PUBLISH packet"
<< std::endl;
}
continue;
}
if (interrupted.load()) {
return true;
}
packetType = packet[0] & 0b11110000;
qos = packet[0] & 0b00000110;

if (packetType != 0b00110000) {
if (mContext->verbose) {
std::cerr << "Unexpected packet type received: "
<< std::bitset<8>(packetType) << "" << std::endl;
}
continue;
}
}

uint8_t remainingLen;
{
Payload packet;
if (!mNetwork.netRecv(packet, 1)) {
if (mContext->verbose) {
std::cerr << "Unable to receive remaining length"
<< std::endl;
}
continue;
}
if (interrupted.load()) {
return true;
}
remainingLen = packet[0];
}

std::string message;
{
Payload packet;
if (!mNetwork.netRecv(packet, remainingLen)) {
if (mContext->verbose) {
std::cerr << "Unable to receive rest of PUBLISH packet"
<< std::endl;
}
continue;
}
if (interrupted.load()) {
return true;
}

int messageStart = 0;

uint16_t topicLen = packet[1] + (packet[0] << 8);
messageStart += 2;
messageStart += topicLen;

if (qos > 0) {
messageStart += 2; // packet id
}

if (messageStart == packet.size()) {
message = "";
} else if (messageStart < packet.size()) {
message =
std::string(packet.begin() + messageStart, packet.end());
} else {
if (mContext->verbose) {
std::cerr << "Not enough buffer for message payload"
<< std::endl;
}
continue;
}
}

std::cout << message << std::endl;
}

return true;
Expand Down
2 changes: 2 additions & 0 deletions src/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ ParseResult parseContext(int argc, char *argv[]) noexcept {
context.message = argv[optind + 1];
}

context.qos = 1; // TODO make configurable

return {ParseResultCode::SUCCESS, {}, context};
}

Expand Down
9 changes: 5 additions & 4 deletions src/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool Network::netRecv(Payload &payload, size_t len) {
while (totalReceived < len) {
auto received =
recv(mSock, payload.data() + totalReceived, len - totalReceived, 0);
if (received == 0) {
if (received <= 0) {
netClose();
return false;
}
Expand Down Expand Up @@ -70,7 +70,8 @@ bool Network::netConnect() {
hints.ai_protocol = IPPROTO_TCP;

if (getaddrinfo(host.data(), port.data(), &hints, &addrs) != 0) {
std::cerr << "unable to resolve address: " << strerror(errno) << "\n";
std::cerr << "unable to resolve address: " << strerror(errno) << ""
<< std::endl;
return false;
}

Expand All @@ -81,8 +82,8 @@ bool Network::netConnect() {
}

if (connect(mSock, addr->ai_addr, addr->ai_addrlen) < 0) {
std::cerr << "unable to connect to host: " << strerror(errno)
<< "\n";
std::cerr << "unable to connect to host: " << strerror(errno) << ""
<< std::endl;
freeaddrinfo(addrs);
netClose();
return false;
Expand Down
Loading

0 comments on commit 28a560b

Please sign in to comment.