Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

working pub sub #13

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build
.vscode
system-test-output
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ Options:

## Subscribe to a topic

> :warning: Work in progress. Subscribe request is sent, but still need to actually listen for payloads.

```
mqtt sub -t hello/topic -a localhost:1883 -c clientId
```
Expand All @@ -44,9 +42,10 @@ Options:
| MQTT3 | x | |
| MQTT5 | | x |
| QOS0 | x | |
| QOS1/2 | | x |
| QOS1 | x | |
| QOS2 | | x |
| PUBLISH | x | |
| SUBSCRIBE | | x |
| SUBSCRIBE | x | |
| Various CONNECT options | | x |
| SSL | | x |

Expand Down
1 change: 1 addition & 0 deletions include/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct Context {
std::string message;
std::string address;
std::string clientId;
uint8_t qos; // TODO make configurable
bool verbose;
};

Expand Down
2 changes: 1 addition & 1 deletion include/Network.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MqttClient {

using Payload = std::vector<unsigned char>;
using Payload = std::vector<uint8_t>;

class Network {
public:
Expand Down
3 changes: 2 additions & 1 deletion include/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ class PacketBuilder {

public:
PacketBuilder(std::shared_ptr<Context> context);
bool connect(Payload &payload); // TODO more specific error handling?
bool connect(Payload &payload);
bool publish(Payload &payload);
bool subscribe(Payload &payload);

private:
std::shared_ptr<Context> mContext;
uint16_t packetId();
};
} // namespace MqttClient
146 changes: 124 additions & 22 deletions src/Command.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "Command.h"
#include "Context.h"
#include "Interrupt.h"
#include "Packet.h"
#include <cstring>
#include <iostream>
Expand All @@ -19,7 +20,8 @@ Command::Command(std::shared_ptr<Context> context)

void Command::execute() {
if (mContext->verbose) {
std::cout << "Executing " << mContext->command << " command\n";
std::cout << "Executing " << mContext->command << " command"
<< std::endl;
}

if (!connect()) {
Expand All @@ -36,22 +38,23 @@ void Command::execute() {
bool Command::connect() {
Payload conn;
if (!mPacketBuilder.connect(conn)) {
std::cerr << "Unable to build CONNECT packet\n";
std::cerr << "Unable to build CONNECT packet" << std::endl;
return false;
}

if (!mNetwork.netSend(conn)) {
std::cerr << "Unable to send CONNECT: " << strerror(errno) << "\n";
std::cerr << "Unable to send CONNECT: " << strerror(errno) << ""
<< std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "CONNECT sent\n";
std::cout << "CONNECT sent" << std::endl;
}

Payload connAck;
if (!mNetwork.netRecv(connAck, 4) || connAck[0] != 0b00100000) {
std::cerr << "CONNACK not received\n";
std::cerr << "CONNACK not received" << std::endl;
return false;
}

Expand All @@ -60,68 +63,167 @@ bool Command::connect() {

if (mContext->verbose) {
std::cout << "CONNACK received! return code: " << returnCode
<< ", session present: " << sessionPresent << "\n";
<< ", session present: " << sessionPresent << "" << std::endl;
}
return true;
}

bool Command::publish() {
Payload publish;
if (!mPacketBuilder.publish(publish)) {
std::cerr << "Unable to build PUBLISH packet\n";
std::cerr << "Unable to build PUBLISH packet" << std::endl;
return false;
}

if (!mNetwork.netSend(publish)) {
std::cerr << "Unable to send PUBLISH: " << strerror(errno) << "\n";
std::cerr << "Unable to send PUBLISH: " << strerror(errno) << ""
<< std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "PUBLISH sent\n";
std::cout << "PUBLISH sent" << std::endl;
}

// TODO support QoS 2
if (mContext->qos == 1) {
Payload pubAck;
if (!mNetwork.netRecv(pubAck, 4)) {
std::cerr << "PUBACK not received" << std::endl;
return false;
}
if (pubAck[0] != 0b01000000) {
std::cerr << "Another packet received instead of PUBACK"
<< std::endl;
return false;
}
if (mContext->verbose) {
std::cout << "PUBACK received" << std::endl;
}
}

// TODO read PUBACK

return true;
}

bool Command::subscribe() {
Payload subscribe;
if (!mPacketBuilder.subscribe(subscribe)) {
std::cerr << "Unable to build SUBSCRIBE packet\n";
std::cerr << "Unable to build SUBSCRIBE packet" << std::endl;
return false;
}

if (!mNetwork.netSend(subscribe)) {
std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << "\n";
std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << ""
<< std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "SUBSCRIBE sent\n";
std::cout << "SUBSCRIBE sent" << std::endl;
}

Payload subAck;
if (!mNetwork.netRecv(subAck, 4)) {
std::cerr << "SUBACK not received\n";
if (!mNetwork.netRecv(subAck, 5)) {
std::cerr << "SUBACK not received" << std::endl;
return false;
}

unsigned char packetType = subAck[0];
auto packetType = subAck[0];
if (packetType != 0b10010000) {
std::cerr << "Another packet received instead of SUBACK\n";
std::cerr << "Another packet received instead of SUBACK" << std::endl;
return false;
}

if (subAck[3] > 2) {
int ret = subAck[3];
std::cerr << "Subscription failed. return code: " << ret << "\n";
if (subAck[4] != mContext->qos) {
std::cerr << "Subscription failed" << std::endl;
return false;
}

if (mContext->verbose) {
std::cout << "SUBACK sent\n";
std::cout << "SUBACK received" << std::endl;
}

// receive PUBLISH messages until interrupted
while (!interrupted.load()) {
uint8_t qos;
{
Payload packet;
if (!mNetwork.netRecv(packet, 1)) {
if (mContext->verbose) {
std::cerr << "Unable to receive PUBLISH packet"
<< std::endl;
}
continue;
}
if (interrupted.load()) {
return true;
}
packetType = packet[0] & 0b11110000;
qos = packet[0] & 0b00000110;

if (packetType != 0b00110000) {
if (mContext->verbose) {
std::cerr << "Unexpected packet type received";
}
continue;
}
}

uint8_t remainingLen;
{
Payload packet;
if (!mNetwork.netRecv(packet, 1)) {
if (mContext->verbose) {
std::cerr << "Unable to receive remaining length"
<< std::endl;
}
continue;
}
if (interrupted.load()) {
return true;
}
remainingLen = packet[0];
}

std::string message;
{
Payload packet;
if (!mNetwork.netRecv(packet, remainingLen)) {
if (mContext->verbose) {
std::cerr << "Unable to receive rest of PUBLISH packet"
<< std::endl;
}
continue;
}
if (interrupted.load()) {
return true;
}

int messageStart = 0;

uint16_t topicLen = packet[1] + (packet[0] << 8);
messageStart += 2;
messageStart += topicLen;

if (qos > 0) {
messageStart += 2; // packet id
}

if (messageStart == packet.size()) {
message = "";
} else if (messageStart < packet.size()) {
message =
std::string(packet.begin() + messageStart, packet.end());
} else {
if (mContext->verbose) {
std::cerr << "Not enough buffer for message payload"
<< std::endl;
}
continue;
}
}

std::cout << message << std::endl;
}

return true;
Expand Down
2 changes: 2 additions & 0 deletions src/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ ParseResult parseContext(int argc, char *argv[]) noexcept {
context.message = argv[optind + 1];
}

context.qos = 1; // TODO make configurable

return {ParseResultCode::SUCCESS, {}, context};
}

Expand Down
9 changes: 5 additions & 4 deletions src/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool Network::netRecv(Payload &payload, size_t len) {
while (totalReceived < len) {
auto received =
recv(mSock, payload.data() + totalReceived, len - totalReceived, 0);
if (received == 0) {
if (received <= 0) {
netClose();
return false;
}
Expand Down Expand Up @@ -70,7 +70,8 @@ bool Network::netConnect() {
hints.ai_protocol = IPPROTO_TCP;

if (getaddrinfo(host.data(), port.data(), &hints, &addrs) != 0) {
std::cerr << "unable to resolve address: " << strerror(errno) << "\n";
std::cerr << "unable to resolve address: " << strerror(errno) << ""
<< std::endl;
return false;
}

Expand All @@ -81,8 +82,8 @@ bool Network::netConnect() {
}

if (connect(mSock, addr->ai_addr, addr->ai_addrlen) < 0) {
std::cerr << "unable to connect to host: " << strerror(errno)
<< "\n";
std::cerr << "unable to connect to host: " << strerror(errno) << ""
<< std::endl;
freeaddrinfo(addrs);
netClose();
return false;
Expand Down
Loading