Skip to content

Commit

Permalink
add subscribe command (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 authored Jun 7, 2024
1 parent 5aed8b6 commit fff728b
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 59 deletions.
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Bare-bones MQTT CLI for Mac and Linux.

This project is a personal learning exercise, for diving into MQTT internals and network programming. On the small chance that you're considering actually using this client....just use one of the real ones listed below.

## Publish to a topic
```
mqtt pub -t hello/topic -a localhost:1883 -c clientId '{"hello": "world"}'
```

```
Usage: mqtt pub [-v] -t <topic> -a <address> -c <clientId> <message>
Options:
Expand All @@ -14,13 +19,23 @@ Options:
-h, --help Show this help message
```

## Publish to a topic
## Subscribe to a topic

> :alert: Work in progress. Subscribe request is sent, but still need to actually listen for payloads.
```
mqtt pub -t hello/topic -a localhost:1883 -c clientId '{"hello": "world"}'
mqtt sub -t hello/topic -a localhost:1883 -c clientId
```

## Subscribe to a topic
WIP
```
Usage: mqtt sub [-v] -t <topic> -a <address> -c <clientId>
Options:
-t, --topic <topic> Specify the topic to publish to (required)
-a, --address <address> <host>[:<port>] address to connect to (required)
-c, --client-id <client id> Client identifier
-v, --verbose Enable verbose output
-h, --help Show this help message
```

## Features

Expand Down
1 change: 1 addition & 0 deletions include/Command.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ class Command {

bool connect();
bool publish();
bool subscribe();
};
} // namespace MqttClient
15 changes: 14 additions & 1 deletion include/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct ParseResult {

ParseResult parseContext(int argc, char *argv[]) noexcept;

const std::string USAGE = R"(
const std::string USAGE_PUB = R"(
Usage: mqtt pub [-v] -t <topic> -a <address> -c <clientId> <message>
Options:
-t, --topic <topic> Specify the topic to publish to (required)
Expand All @@ -36,4 +36,17 @@ Usage: mqtt pub [-v] -t <topic> -a <address> -c <clientId> <message>
mqtt pub -t hello/topic -v -a 127.0.0.1:8883 -c myclient '{"hello": "world"}'
)";

const std::string USAGE_SUB = R"(
Usage: mqtt sub [-v] -t <topic> -a <address> -c <clientId>
Options:
-t, --topic <topic> Specify the topic to publish to (required)
-a, --address <address> <host>[:<port>] address to connect to (required)
-c, --client-id <client id> Client identifier
-v, --verbose Enable verbose output
-h, --help Show this help message
Example:
mqtt sub -t hello/topic -v -a 127.0.0.1:8883 -c myclient
)";

}; // namespace MqttClient
2 changes: 1 addition & 1 deletion include/Network.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MqttClient {

using Payload = std::vector<char>;
using Payload = std::vector<unsigned char>;

class Network {
public:
Expand Down
1 change: 1 addition & 0 deletions include/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class PacketBuilder {
PacketBuilder(std::shared_ptr<Context> context);
bool connect(Payload &payload); // TODO more specific error handling?
bool publish(Payload &payload);
bool subscribe(Payload &payload);

private:
std::shared_ptr<Context> mContext;
Expand Down
45 changes: 43 additions & 2 deletions src/Command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void Command::execute() {
if (mContext->command == "pub") {
publish();
} else if (mContext->command == "sub") {
// TODO
subscribe();
}
}

Expand Down Expand Up @@ -81,9 +81,50 @@ bool Command::publish() {
std::cout << "PUBLISH sent\n";
}

// TODO read PUBACK

return true;
}

bool Command::subscribe() {
Payload subscribe;
if (!mPacketBuilder.subscribe(subscribe)) {
std::cerr << "Unable to build SUBSCRIBE packet\n";
return false;
}

if (!mNetwork.netSend(subscribe)) {
std::cerr << "Unable to send SUBSCRIBE: " << strerror(errno) << "\n";
return false;
}

if (mContext->verbose) {
std::cout << "SUBSCRIBE sent\n";
}

Payload subAck;
if (!mNetwork.netRecv(subAck, 4)) {
std::cerr << "SUBACK not received\n";
return false;
}

unsigned char packetType = subAck[0];
if (packetType != 0b10010000) {
std::cerr << "Another packet received instead of SUBACK\n";
return false;
}

if (subAck[3] > 2) {
int ret = subAck[3];
std::cerr << "Subscription failed. return code: " << ret << "\n";
return false;
}

if (mContext->verbose) {
std::cout << "SUBACK sent\n";
}

// PUBACK
return true;
}

} // namespace MqttClient
49 changes: 32 additions & 17 deletions src/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,47 +35,62 @@ ParseResult parseContext(int argc, char *argv[]) noexcept {
context.verbose = true;
break;
case 'h':
return {ParseResultCode::HELP};
return {ParseResultCode::HELP, {}, context};
case '?':
return {ParseResultCode::FAILURE};
default:
return {ParseResultCode::FAILURE};
}
}

if (context.command != "pub") {
return {ParseResultCode::FAILURE,
"Unknown command: " + context.command};
if (context.command != "pub" &&
context.command != "sub") { // TODO case-insensitive
return {ParseResultCode::FAILURE, "Unknown command: " + context.command,
context};
}

if (context.topic.empty()) {
// TODO more validations
return {ParseResultCode::FAILURE, "Error: Topic is required"};
return {ParseResultCode::FAILURE, "Error: Topic is required", context};
}

if (context.address.empty()) {
// TODO more validations
return {ParseResultCode::FAILURE, "Error: Address is required"};
return {ParseResultCode::FAILURE, "Error: Address is required",
context};
}

if (context.clientId.empty()) {
// TODO more validations
return {ParseResultCode::FAILURE, "Error: Client id is required"};
return {ParseResultCode::FAILURE, "Error: Client id is required",
context};
}
if (context.clientId.size() > 245) {
return {ParseResultCode::FAILURE, "Error: Client id is too long",
context};
}

auto numMessages = argc - 1 - optind;
if (numMessages == 0) {
return {ParseResultCode::FAILURE, "Error: Message is required"};
}
if (numMessages > 1) {
return {ParseResultCode::FAILURE,
"Error: " + std::to_string(numMessages) +
" messages were provided. Currently, only 1 message at a "
"time is supported"};
if (context.command == "sub") {
if (numMessages != 0) {
return {ParseResultCode::FAILURE,
"Error: Message is not allowed for sub command", context};
}
} else {
if (numMessages == 0) {
return {ParseResultCode::FAILURE, "Error: Message is required",
context};
}
if (numMessages > 1) {
return {ParseResultCode::FAILURE,
"Error: " + std::to_string(numMessages) +
" messages were provided. Only 1 message at a "
"time is supported",
context};
}
context.message = argv[optind + 1];
}

context.message = argv[optind + 1];

return {ParseResultCode::SUCCESS, {}, context};
}

Expand Down
119 changes: 87 additions & 32 deletions src/Packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,117 @@ namespace MqttClient {
PacketBuilder::PacketBuilder(std::shared_ptr<Context> context)
: mContext(std::move(context)) {};

// TODO will topic, will message, user name, password, clean session
// TODO support MQTT5
bool PacketBuilder::connect(Payload &payload) {
char variableHeaderLength = 10; // TODO const

// client id length needs to fit in the second byte of the connect
// packet (remainingLength)
if (mContext->clientId.size() >
255 - variableHeaderLength) { // TODO can this just be moved to
// earlier validation?
return false; // TODO error handling
}

// supported payload fields: client identifier
// TODO will topic, will message, user name, password, clean session
// TODO support MQTT5

// BEGIN FIXED HEADER
payload.push_back(0b00010000); // MQTT control packet type (1)
payload.push_back(variableHeaderLength + 2 +
mContext->clientId.size()); // remainingLength
// ------------------
// MQTT control packet type (1)
payload.push_back(0b00010000);
// remaining length
payload.push_back(10 + (2 + mContext->clientId.size()));

// BEGIN VARIABLE HEADER
payload.push_back(0); // MQTT length MSB
payload.push_back(4); // MQTT length LSB
// ---------------------
// MQTT length MSB
payload.push_back(0);
// MQTT length LSB
payload.push_back(4);
payload.push_back('M');
payload.push_back('Q');
payload.push_back('T');
payload.push_back('T');
payload.push_back(4); // protocol level (MQTT 3.1.1)
payload.push_back(
0b00000010); // connect flags, just cleanSession=true for now
payload.push_back(0); // keep alive MSB
payload.push_back(0); // keep alive LSB
// protocol level (MQTT 3.1.1)
payload.push_back(4);
// connect flags
// cleanSession=true
// TODO support other flags
payload.push_back(0b00000010);
// keep alive MSB
// TODO
payload.push_back(0);
// keep alive LSB
// TODO
payload.push_back(0);

// BEGIN PAYLOAD
// -------------
// client id length MSB
// TODO
payload.push_back(0);
// client id length LSB
payload.push_back(mContext->clientId.size());
// client id
payload.push_back(0); // TODO client id length MSB
payload.push_back(mContext->clientId.size()); // client id length LSB
for (size_t i = 0; i < mContext->clientId.size(); ++i) {
payload.push_back(mContext->clientId[i]);
}

return true;
}

bool PacketBuilder::publish(Payload &payload) {
// BEGIN FIXED HEADER
payload.push_back(0b00110000); // MQTT control packet type (3) TODO support
// dup , qos, retain
payload.push_back(2 + mContext->topic.size() +
mContext->message.size()); // remainingLength
// ------------------
// MQTT control packet type (3)
// TODO support dup , qos, retain
payload.push_back(0b00110000);
// remaining length
payload.push_back((2 + mContext->topic.size()) + mContext->message.size());

// BEGIN VARIABLE HEADER
payload.push_back(0); // TODO topic length MSB
payload.push_back(mContext->topic.size()); // topic length LSB
// ---------------------
// topic length MSB
// TODO
payload.push_back(0);
// topic length LSB
payload.push_back(mContext->topic.size());
// topic
for (size_t i = 0; i < mContext->topic.size(); ++i) {
payload.push_back(mContext->topic[i]);
}

// BEGIN PAYLOAD
// -------------
for (size_t i = 0; i < mContext->message.size(); ++i) {
payload.push_back(mContext->message[i]);
}

return true;
}

bool PacketBuilder::subscribe(Payload &payload) {
// BEGIN FIXED HEADER
// ------------------
// MQTT control packet type (8)
payload.push_back(0b10000010);
// remaining length
payload.push_back(2 + (3 + mContext->topic.size()));

// BEGIN VARIABLE HEADER
// ---------------------
// packet identifier MSB
// TODO
payload.push_back(0);
// packet identifier LSB
// TODO support a real identifier
payload.push_back(1);

// BEGIN PAYLOAD
// -------------
// topic length MSB
// TODO
payload.push_back(0);
// topic length LSB
payload.push_back(mContext->topic.size());
// topic
for (size_t i = 0; i < mContext->topic.size(); ++i) {
payload.push_back(mContext->topic[i]);
}
// topic QoS
// TODO
payload.push_back(0);

return true;
}

} // namespace MqttClient
Loading

0 comments on commit fff728b

Please sign in to comment.