Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 127 additions & 92 deletions Examples/Publish.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// run on terminal -> $./Publish test.mosquitto.org 1883

/** @file Publish.c
* Topic "publish" example and test case for the MQTT library.
* @author Adrien RICCIARDI
Expand All @@ -13,6 +15,10 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>

#include "posix_sockets.h"

//-------------------------------------------------------------------------------------------------
// Private variables
Expand All @@ -25,104 +31,133 @@ static unsigned short Publish_Server_Port;
//-------------------------------------------------------------------------------------------------
// Private functions
//-------------------------------------------------------------------------------------------------
void PublishConnectAndPublishData(char *Pointer_String_Client_ID, char *Pointer_String_User_Name, char *Pointer_String_Password, char *Pointer_String_Topic_Name, void *Pointer_Application_Data, int Application_Data_Size)
{
static unsigned char Buffer[1024]; // Avoid storing a big buffer on the stack
TMQTTContext MQTT_Context;
TMQTTConnectionParameters MQTT_Connection_Parameters;
int Socket, Result;
struct sockaddr_in Address;
ssize_t Read_Bytes_Count;

// Create a TCP socket
Socket = socket(AF_INET, SOCK_STREAM, 0);
if (Socket == -1)
{
printf("Error : failed to create socket (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

// Try to connect to the server
printf("Connecting to '%s:%d'...\n", Publish_String_Server_IP_Address, Publish_Server_Port);
Address.sin_family = AF_INET;
Address.sin_addr.s_addr = inet_addr(Publish_String_Server_IP_Address);
Address.sin_port = htons(Publish_Server_Port);
if (connect(Socket, (const struct sockaddr *) &Address, sizeof(Address)) == -1)
{
printf("Error : failed to connect to MQTT server (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

// Send MQTT CONNECT packet
printf("Sending CONNECT packet...\n");
MQTT_Connection_Parameters.Pointer_String_Client_Identifier = Pointer_String_Client_ID;
MQTT_Connection_Parameters.Pointer_String_User_Name = Pointer_String_User_Name;
MQTT_Connection_Parameters.Pointer_String_Password = Pointer_String_Password;
MQTT_Connection_Parameters.Is_Clean_Session_Enabled = 1;
MQTT_Connection_Parameters.Keep_Alive = 60;
MQTT_Connection_Parameters.Pointer_Buffer = Buffer;
MQTTConnect(&MQTT_Context, &MQTT_Connection_Parameters);
if (write(Socket, MQTT_GET_MESSAGE_BUFFER(&MQTT_Context), MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) != MQTT_GET_MESSAGE_SIZE(&MQTT_Context))
{
printf("Error : failed to send MQTT CONNECT packet (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

// Specifications allow to send control packets without waiting for CONNACK, but if the client is too fast the server can't keep up, so wait for the CONNACK
printf("Waiting for CONNACK packet...\n");
Read_Bytes_Count = read(Socket, Buffer, sizeof(Buffer));
Result = MQTTIsConnectionEstablished(Buffer, Read_Bytes_Count);
if (Result != 0)
{
printf("Error : server rejected connection. CONNACK return code : 0x%X\n", Result);
exit(EXIT_FAILURE);
}

// Publish data
printf("Sending PUBLISH packet...\n");
MQTTPublish(&MQTT_Context, Pointer_String_Topic_Name, Pointer_Application_Data, Application_Data_Size);
if (write(Socket, MQTT_GET_MESSAGE_BUFFER(&MQTT_Context), MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) != MQTT_GET_MESSAGE_SIZE(&MQTT_Context))
{
printf("Error : failed to send MQTT PUBLISH packet (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

// Disconnect from MQTT server
MQTTDisconnect(&MQTT_Context);
printf("Sending DISCONNECT packet...\n");
if (write(Socket, MQTT_GET_MESSAGE_BUFFER(&MQTT_Context), MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) != MQTT_GET_MESSAGE_SIZE(&MQTT_Context))
{
printf("Error : failed to send MQTT DISCONNECT packet (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

putchar('\n');
close(Socket);
ssize_t read_socket(int socket, unsigned char *buffer, size_t size) {
size_t total_read = 0;
ssize_t bytes_read;

while (total_read < size) {
fd_set read_fds;
struct timeval timeout;

FD_ZERO(&read_fds);
FD_SET(socket, &read_fds);
timeout.tv_sec = 5; // Timeout after 5 seconds
timeout.tv_usec = 0;

int select_result = select(socket + 1, &read_fds, NULL, NULL, &timeout);
if (select_result < 0) {
perror("Select error");
return -1;
} else if (select_result == 0) {
fprintf(stderr, "Read timeout\n");
return -1;
}

bytes_read = read(socket, buffer + total_read, size - total_read);
if (bytes_read < 0) {
if (errno == EINTR) {
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
perror("Read error");
return -1;
}
} else if (bytes_read == 0) {
break;
}

total_read += bytes_read;
printf("Read %zd bytes, total_read: %zu\n", bytes_read, total_read);
}

return total_read;
}

void PublishConnectAndPublishData(char *Pointer_String_Client_ID, char *Pointer_String_User_Name, char *Pointer_String_Password, char *Pointer_String_Topic_Name, void *Pointer_Application_Data, int Application_Data_Size) {
static unsigned char Buffer[1024]; // Avoid storing a big buffer on the stack
TMQTTContext MQTT_Context;
TMQTTConnectionParameters MQTT_Connection_Parameters;
int Socket, Result;
ssize_t Read_Bytes_Count;

// Open non-blocking TCP socket
printf("Opening socket...\n");
char Port_String[6];
snprintf(Port_String, sizeof(Port_String), "%u", Publish_Server_Port);
Socket = open_nb_socket(Publish_String_Server_IP_Address, Port_String);

if (Socket == -1) {
perror("Failed to open socket: ");
exit(EXIT_FAILURE);
}

// Send MQTT CONNECT packet
printf("Sending CONNECT packet...\n");
MQTT_Connection_Parameters.Pointer_String_Client_Identifier = Pointer_String_Client_ID;
MQTT_Connection_Parameters.Pointer_String_User_Name = Pointer_String_User_Name;
MQTT_Connection_Parameters.Pointer_String_Password = Pointer_String_Password;
MQTT_Connection_Parameters.Is_Clean_Session_Enabled = 1;
MQTT_Connection_Parameters.Keep_Alive = 60;
MQTT_Connection_Parameters.Pointer_Buffer = Buffer;
MQTTConnect(&MQTT_Context, &MQTT_Connection_Parameters);
if (write(Socket, MQTT_GET_MESSAGE_BUFFER(&MQTT_Context), MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) != MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) {
printf("Error : failed to send MQTT CONNECT packet (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

// Wait for CONNACK packet
printf("Waiting for CONNACK packet...\n");
Read_Bytes_Count = read_socket(Socket, Buffer, MQTT_CONNACK_MESSAGE_SIZE);
if (Read_Bytes_Count < 0) {
printf("Error : failed to read CONNACK packet.\n");
exit(EXIT_FAILURE);
}

Result = MQTTIsConnectionEstablished(Buffer, Read_Bytes_Count);
if (Result != 0) {
printf("Error : server rejected connection. CONNACK return code : 0x%X\n", Result);
printf("Client ID: %s\n", Pointer_String_Client_ID);
printf("Username: %s\n", Pointer_String_User_Name ? Pointer_String_User_Name : "NULL");
printf("Password: %s\n", Pointer_String_Password ? Pointer_String_Password : "NULL");
exit(EXIT_FAILURE);
}

// Publish data
printf("Sending PUBLISH packet...\n");
MQTTPublish(&MQTT_Context, Pointer_String_Topic_Name, Pointer_Application_Data, Application_Data_Size);
if (write(Socket, MQTT_GET_MESSAGE_BUFFER(&MQTT_Context), MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) != MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) {
printf("Error : failed to send MQTT PUBLISH packet (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

// Disconnect from MQTT server
MQTTDisconnect(&MQTT_Context);
printf("Sending DISCONNECT packet...\n");
if (write(Socket, MQTT_GET_MESSAGE_BUFFER(&MQTT_Context), MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) != MQTT_GET_MESSAGE_SIZE(&MQTT_Context)) {
printf("Error : failed to send MQTT DISCONNECT packet (%s).\n", strerror(errno));
exit(EXIT_FAILURE);
}

putchar('\n');
close(Socket);
}

//-------------------------------------------------------------------------------------------------
// Entry point
//-------------------------------------------------------------------------------------------------
int main(int argc, char *argv[])
{
// Check parameters
if (argc != 3)
{
printf("Usage : %s MQTT_Server_IP_Address MQTT_Server_Port\n"
"Use wireshark or another network traffic analyzer in the same time to check if the packets are well formed.\n", argv[0]);
return EXIT_FAILURE;
}
strcpy(Publish_String_Server_IP_Address, argv[1]);
int main(int argc, char *argv[]) {
// Check parameters
if (argc != 3) {
printf("Usage : %s MQTT_Server_IP_Address MQTT_Server_Port\n"
"Use wireshark or another network traffic analyzer in the same time to check if the packets are well formed.\n", argv[0]);
return EXIT_FAILURE;
}
strcpy(Publish_String_Server_IP_Address, argv[1]);
Publish_Server_Port = atoi(argv[2]);

// Test message with all parameters set
PublishConnectAndPublishData("ID du client", "nom d'utilisateur", "super mot de passe de ouf", "test1", "charge utile courte", sizeof("charge utile courte") - 1);

// Send a "big" message to test "remaining length" computation algorithm
PublishConnectAndPublishData("ID du client", "Ceci est un message bien plus long pour voir si le calcul d'une taille de paquet supérieure à 127 octets fonctionne correctement", "Ce champ aussi est allongé dans le but décrit exhaustivement dans le champ précédent", "le topic est lui aussi assez long pour que le message publish dépasse 127 caractères et qu'il faille calculer la taille sur deux octets", "la charge utile est un peu plus longue aussi même si la longueur du topic devrait suffire pour atteindre les 127 octets", sizeof("la charge utile est un peu plus longue aussi même si la longueur du topic devrait suffire pour atteindre les 127 octets") - 1);

// Send a message with less parameters
PublishConnectAndPublishData("ID du client", NULL, NULL, "topic", "données", sizeof("données") - 1);

PublishConnectAndPublishData("ClientID", NULL, NULL, "test/topic", "HI MQTT!", sizeof("HI MQTT!") - 1);

return 0;
}
Loading