Skip to content

Commit 9fc24c4

Browse files
committed
Add persistent storage for MQTT state
Related to FreeRTOS#271 Add support for storing and restoring MQTT state in persistent memory to handle QoS2 messages after a device reboot. * Add function prototypes for `MQTT_SetOutgoingPublishRecord`, `MQTT_GetOutgoingPublishRecord`, and `MQTT_GetFailedPacketId` in `source/include/core_mqtt_state.h`. * Implement `MQTT_SetOutgoingPublishRecord`, `MQTT_GetOutgoingPublishRecord`, and `MQTT_GetFailedPacketId` functions in `source/core_mqtt_state.c`. * Update `README.md` to include instructions on using the new setter and getter functions. * Add unit tests for `MQTT_SetOutgoingPublishRecord`, `MQTT_GetOutgoingPublishRecord`, and `MQTT_GetFailedPacketId` functions in `test/unit-test/core_mqtt_state_utest.c`.
1 parent f1827d8 commit 9fc24c4

File tree

4 files changed

+326
-8
lines changed

4 files changed

+326
-8
lines changed

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,38 @@ doxygen docs/doxygen/config.doxyfile
248248

249249
See [CONTRIBUTING.md](./.github/CONTRIBUTING.md) for information on
250250
contributing.
251+
252+
## Using Setter and Getter Functions for Persistent Storage
253+
254+
The coreMQTT library provides setter and getter functions to allow the application to store and restore MQTT state in persistent memory. This is useful for handling QoS2 messages after a device reboot.
255+
256+
### Setter Function
257+
258+
The `MQTT_SetOutgoingPublishRecord` function allows the application to set an outgoing publish record in the MQTT context. It can be used to restore the state of the MQTT context after a device reboot.
259+
260+
```c
261+
MQTTStatus_t MQTT_SetOutgoingPublishRecord( MQTTContext_t * pMqttContext,
262+
uint16_t packetId,
263+
MQTTQoS_t qos,
264+
MQTTPublishState_t publishState );
265+
```
266+
267+
### Getter Function
268+
269+
The `MQTT_GetOutgoingPublishRecord` function allows the application to get an outgoing publish record from the MQTT context. It can be used to store the state of the MQTT context in persistent memory before a device reboot.
270+
271+
```c
272+
MQTTStatus_t MQTT_GetOutgoingPublishRecord( const MQTTContext_t * pMqttContext,
273+
uint16_t packetId,
274+
MQTTQoS_t * pQos,
275+
MQTTPublishState_t * pPublishState );
276+
```
277+
278+
### Getting the Failed Packet ID
279+
280+
The `MQTT_GetFailedPacketId` function allows the application to get the packet ID of the failed packet from the MQTT context. It can be used to handle the situation when the library loses state after a device reboot.
281+
282+
```c
283+
MQTTStatus_t MQTT_GetFailedPacketId( const MQTTContext_t * pMqttContext,
284+
uint16_t * pPacketId );
285+
```

source/core_mqtt_state.c

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,3 +1204,93 @@ const char * MQTT_State_strerror( MQTTPublishState_t state )
12041204
}
12051205

12061206
/*-----------------------------------------------------------*/
1207+
1208+
MQTTStatus_t MQTT_SetOutgoingPublishRecord( MQTTContext_t * pMqttContext,
1209+
uint16_t packetId,
1210+
MQTTQoS_t qos,
1211+
MQTTPublishState_t publishState )
1212+
{
1213+
MQTTStatus_t status = MQTTSuccess;
1214+
1215+
if( ( pMqttContext == NULL ) || ( packetId == MQTT_PACKET_ID_INVALID ) || ( qos == MQTTQoS0 ) )
1216+
{
1217+
status = MQTTBadParameter;
1218+
}
1219+
else
1220+
{
1221+
status = addRecord( pMqttContext->outgoingPublishRecords,
1222+
pMqttContext->outgoingPublishRecordMaxCount,
1223+
packetId,
1224+
qos,
1225+
publishState );
1226+
}
1227+
1228+
return status;
1229+
}
1230+
1231+
MQTTStatus_t MQTT_GetOutgoingPublishRecord( const MQTTContext_t * pMqttContext,
1232+
uint16_t packetId,
1233+
MQTTQoS_t * pQos,
1234+
MQTTPublishState_t * pPublishState )
1235+
{
1236+
MQTTStatus_t status = MQTTSuccess;
1237+
size_t recordIndex;
1238+
1239+
if( ( pMqttContext == NULL ) || ( packetId == MQTT_PACKET_ID_INVALID ) || ( pQos == NULL ) || ( pPublishState == NULL ) )
1240+
{
1241+
status = MQTTBadParameter;
1242+
}
1243+
else
1244+
{
1245+
recordIndex = findInRecord( pMqttContext->outgoingPublishRecords,
1246+
pMqttContext->outgoingPublishRecordMaxCount,
1247+
packetId,
1248+
pQos,
1249+
pPublishState );
1250+
1251+
if( recordIndex == MQTT_INVALID_STATE_COUNT )
1252+
{
1253+
status = MQTTBadParameter;
1254+
}
1255+
}
1256+
1257+
return status;
1258+
}
1259+
1260+
MQTTStatus_t MQTT_GetFailedPacketId( const MQTTContext_t * pMqttContext,
1261+
uint16_t * pPacketId )
1262+
{
1263+
MQTTStatus_t status = MQTTSuccess;
1264+
size_t recordIndex;
1265+
MQTTQoS_t qos;
1266+
MQTTPublishState_t publishState;
1267+
1268+
if( ( pMqttContext == NULL ) || ( pPacketId == NULL ) )
1269+
{
1270+
status = MQTTBadParameter;
1271+
}
1272+
else
1273+
{
1274+
for( recordIndex = 0; recordIndex < pMqttContext->outgoingPublishRecordMaxCount; recordIndex++ )
1275+
{
1276+
if( pMqttContext->outgoingPublishRecords[ recordIndex ].packetId != MQTT_PACKET_ID_INVALID )
1277+
{
1278+
qos = pMqttContext->outgoingPublishRecords[ recordIndex ].qos;
1279+
publishState = pMqttContext->outgoingPublishRecords[ recordIndex ].publishState;
1280+
1281+
if( ( qos == MQTTQoS2 ) && ( publishState == MQTTPubRelSend ) )
1282+
{
1283+
*pPacketId = pMqttContext->outgoingPublishRecords[ recordIndex ].packetId;
1284+
break;
1285+
}
1286+
}
1287+
}
1288+
1289+
if( recordIndex == pMqttContext->outgoingPublishRecordMaxCount )
1290+
{
1291+
status = MQTTBadParameter;
1292+
}
1293+
}
1294+
1295+
return status;
1296+
}

source/include/core_mqtt_state.h

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
* copies or substantial portions of the Software.
1616
*
1717
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18-
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
19-
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20-
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
21-
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21+
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
2222
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2323
*/
2424

@@ -301,6 +301,59 @@ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
301301
const char * MQTT_State_strerror( MQTTPublishState_t state );
302302
/** @endcond */
303303

304+
/**
305+
* @brief Set an outgoing publish record in the MQTT context.
306+
*
307+
* This function allows the application to set an outgoing publish record in the
308+
* MQTT context. It can be used to restore the state of the MQTT context after a
309+
* device reboot.
310+
*
311+
* @param[in] pMqttContext Initialized MQTT context.
312+
* @param[in] packetId ID of the PUBLISH packet.
313+
* @param[in] qos QoS of the PUBLISH packet.
314+
* @param[in] publishState State of the PUBLISH packet.
315+
*
316+
* @return #MQTTBadParameter, #MQTTNoMemory, or #MQTTSuccess.
317+
*/
318+
MQTTStatus_t MQTT_SetOutgoingPublishRecord( MQTTContext_t * pMqttContext,
319+
uint16_t packetId,
320+
MQTTQoS_t qos,
321+
MQTTPublishState_t publishState );
322+
323+
/**
324+
* @brief Get an outgoing publish record from the MQTT context.
325+
*
326+
* This function allows the application to get an outgoing publish record from the
327+
* MQTT context. It can be used to store the state of the MQTT context in persistent
328+
* memory before a device reboot.
329+
*
330+
* @param[in] pMqttContext Initialized MQTT context.
331+
* @param[in] packetId ID of the PUBLISH packet.
332+
* @param[out] pQos QoS of the PUBLISH packet.
333+
* @param[out] pPublishState State of the PUBLISH packet.
334+
*
335+
* @return #MQTTBadParameter, #MQTTSuccess.
336+
*/
337+
MQTTStatus_t MQTT_GetOutgoingPublishRecord( const MQTTContext_t * pMqttContext,
338+
uint16_t packetId,
339+
MQTTQoS_t * pQos,
340+
MQTTPublishState_t * pPublishState );
341+
342+
/**
343+
* @brief Get the packet ID of the failed packet.
344+
*
345+
* This function allows the application to get the packet ID of the failed packet
346+
* from the MQTT context. It can be used to handle the situation when the library
347+
* loses state after a device reboot.
348+
*
349+
* @param[in] pMqttContext Initialized MQTT context.
350+
* @param[out] pPacketId ID of the failed packet.
351+
*
352+
* @return #MQTTBadParameter, #MQTTSuccess.
353+
*/
354+
MQTTStatus_t MQTT_GetFailedPacketId( const MQTTContext_t * pMqttContext,
355+
uint16_t * pPacketId );
356+
304357
/* *INDENT-OFF* */
305358
#ifdef __cplusplus
306359
}

test/unit-test/core_mqtt_state_utest.c

Lines changed: 144 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
* copies or substantial portions of the Software.
1616
*
1717
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18-
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
19-
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20-
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
21-
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21+
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
2222
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2323
*/
2424

@@ -1168,3 +1168,143 @@ void test_MQTT_State_strerror( void )
11681168
}
11691169

11701170
/* ========================================================================== */
1171+
1172+
void test_MQTT_SetOutgoingPublishRecord( void )
1173+
{
1174+
MQTTContext_t mqttContext = { 0 };
1175+
MQTTStatus_t status;
1176+
const uint16_t PACKET_ID = 1;
1177+
const MQTTQoS_t qos = MQTTQoS2;
1178+
const MQTTPublishState_t publishState = MQTTPubRelSend;
1179+
TransportInterface_t transport;
1180+
MQTTFixedBuffer_t networkBuffer = { 0 };
1181+
1182+
transport.recv = transportRecvSuccess;
1183+
transport.send = transportSendSuccess;
1184+
1185+
MQTTPubAckInfo_t incomingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
1186+
MQTTPubAckInfo_t outgoingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
1187+
1188+
status = MQTT_Init( &mqttContext, &transport,
1189+
getTime, eventCallback, &networkBuffer );
1190+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1191+
1192+
status = MQTT_InitStatefulQoS( &mqttContext,
1193+
outgoingRecords, MQTT_STATE_ARRAY_MAX_COUNT,
1194+
incomingRecords, MQTT_STATE_ARRAY_MAX_COUNT );
1195+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1196+
1197+
/* Test for bad parameters */
1198+
status = MQTT_SetOutgoingPublishRecord( NULL, PACKET_ID, qos, publishState );
1199+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1200+
status = MQTT_SetOutgoingPublishRecord( &mqttContext, MQTT_PACKET_ID_INVALID, qos, publishState );
1201+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1202+
status = MQTT_SetOutgoingPublishRecord( &mqttContext, PACKET_ID, MQTTQoS0, publishState );
1203+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1204+
1205+
/* Success. */
1206+
status = MQTT_SetOutgoingPublishRecord( &mqttContext, PACKET_ID, qos, publishState );
1207+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1208+
/* Verify the record is added correctly. */
1209+
TEST_ASSERT_EQUAL( PACKET_ID, mqttContext.outgoingPublishRecords[ 0 ].packetId );
1210+
TEST_ASSERT_EQUAL( qos, mqttContext.outgoingPublishRecords[ 0 ].qos );
1211+
TEST_ASSERT_EQUAL( publishState, mqttContext.outgoingPublishRecords[ 0 ].publishState );
1212+
}
1213+
1214+
/* ========================================================================== */
1215+
1216+
void test_MQTT_GetOutgoingPublishRecord( void )
1217+
{
1218+
MQTTContext_t mqttContext = { 0 };
1219+
MQTTStatus_t status;
1220+
const uint16_t PACKET_ID = 1;
1221+
const MQTTQoS_t qos = MQTTQoS2;
1222+
const MQTTPublishState_t publishState = MQTTPubRelSend;
1223+
MQTTQoS_t retrievedQos;
1224+
MQTTPublishState_t retrievedPublishState;
1225+
TransportInterface_t transport;
1226+
MQTTFixedBuffer_t networkBuffer = { 0 };
1227+
1228+
transport.recv = transportRecvSuccess;
1229+
transport.send = transportSendSuccess;
1230+
1231+
MQTTPubAckInfo_t incomingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
1232+
MQTTPubAckInfo_t outgoingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
1233+
1234+
status = MQTT_Init( &mqttContext, &transport,
1235+
getTime, eventCallback, &networkBuffer );
1236+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1237+
1238+
status = MQTT_InitStatefulQoS( &mqttContext,
1239+
outgoingRecords, MQTT_STATE_ARRAY_MAX_COUNT,
1240+
incomingRecords, MQTT_STATE_ARRAY_MAX_COUNT );
1241+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1242+
1243+
/* Test for bad parameters */
1244+
status = MQTT_GetOutgoingPublishRecord( NULL, PACKET_ID, &retrievedQos, &retrievedPublishState );
1245+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1246+
status = MQTT_GetOutgoingPublishRecord( &mqttContext, MQTT_PACKET_ID_INVALID, &retrievedQos, &retrievedPublishState );
1247+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1248+
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, NULL, &retrievedPublishState );
1249+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1250+
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, &retrievedQos, NULL );
1251+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1252+
1253+
/* No record found. */
1254+
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, &retrievedQos, &retrievedPublishState );
1255+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1256+
1257+
/* Success. */
1258+
addToRecord( mqttContext.outgoingPublishRecords, 0, PACKET_ID, qos, publishState );
1259+
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, &retrievedQos, &retrievedPublishState );
1260+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1261+
/* Verify the record is retrieved correctly. */
1262+
TEST_ASSERT_EQUAL( qos, retrievedQos );
1263+
TEST_ASSERT_EQUAL( publishState, retrievedPublishState );
1264+
}
1265+
1266+
/* ========================================================================== */
1267+
1268+
void test_MQTT_GetFailedPacketId( void )
1269+
{
1270+
MQTTContext_t mqttContext = { 0 };
1271+
MQTTStatus_t status;
1272+
const uint16_t PACKET_ID = 1;
1273+
const MQTTQoS_t qos = MQTTQoS2;
1274+
const MQTTPublishState_t publishState = MQTTPubRelSend;
1275+
uint16_t retrievedPacketId;
1276+
TransportInterface_t transport;
1277+
MQTTFixedBuffer_t networkBuffer = { 0 };
1278+
1279+
transport.recv = transportRecvSuccess;
1280+
transport.send = transportSendSuccess;
1281+
1282+
MQTTPubAckInfo_t incomingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
1283+
MQTTPubAckInfo_t outgoingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
1284+
1285+
status = MQTT_Init( &mqttContext, &transport,
1286+
getTime, eventCallback, &networkBuffer );
1287+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1288+
1289+
status = MQTT_InitStatefulQoS( &mqttContext,
1290+
outgoingRecords, MQTT_STATE_ARRAY_MAX_COUNT,
1291+
incomingRecords, MQTT_STATE_ARRAY_MAX_COUNT );
1292+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1293+
1294+
/* Test for bad parameters */
1295+
status = MQTT_GetFailedPacketId( NULL, &retrievedPacketId );
1296+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1297+
status = MQTT_GetFailedPacketId( &mqttContext, NULL );
1298+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1299+
1300+
/* No record found. */
1301+
status = MQTT_GetFailedPacketId( &mqttContext, &retrievedPacketId );
1302+
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
1303+
1304+
/* Success. */
1305+
addToRecord( mqttContext.outgoingPublishRecords, 0, PACKET_ID, qos, publishState );
1306+
status = MQTT_GetFailedPacketId( &mqttContext, &retrievedPacketId );
1307+
TEST_ASSERT_EQUAL( MQTTSuccess, status );
1308+
/* Verify the packet ID is retrieved correctly. */
1309+
TEST_ASSERT_EQUAL( PACKET_ID, retrievedPacketId );
1310+
}

0 commit comments

Comments
 (0)