From b28bccc5d54997fb1080a9868813d7197a77f2dc Mon Sep 17 00:00:00 2001 From: rtel Date: Thu, 25 Jul 2019 20:20:24 +0000 Subject: [PATCH] Files as per 190725_FreeRTOS_IoT_Libs_Task_Pool_and_MQTT_Preview interim release. git-svn-id: https://svn.code.sf.net/p/freertos/code/trunk@2708 1d2547de-c912-0410-9cb9-b8ca96c0e9e2 --- .../mqtt/READ_ME_INSTRUCTIONS.url | 5 + .../FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj | 4 - .../mqtt/WIN32.vcxproj.filters | 12 - .../Demo/FreeRTOS_IoT_Libraries/mqtt/main.c | 13 +- .../DemoTasks/SimpleTaskPoolExamples.c | 1 - .../task_pool/READ_ME_INSTRUCTIONS.url | 6 + .../task_pool/WIN32.vcxproj | 2 +- .../task_pool/WIN32.vcxproj.filters | 6 +- .../FreeRTOS_IoT_Libraries/task_pool/main.c | 12 +- .../c_sdk/standard/common/include/atomic.h | 547 ++++++++++++++++++ .../c_sdk/standard/mqtt/include/iot_mqtt.h | 8 +- .../c_sdk/standard/mqtt/src/iot_mqtt_api.c | 101 ++-- .../standard/mqtt/src/iot_mqtt_network.c | 210 ++++--- .../standard/mqtt/src/iot_mqtt_operation.c | 158 +++-- .../standard/mqtt/src/iot_mqtt_serialize.c | 4 +- .../standard/mqtt/src/iot_mqtt_subscription.c | 4 +- .../mqtt/src/private/iot_mqtt_internal.h | 157 ++--- 17 files changed, 968 insertions(+), 282 deletions(-) create mode 100644 FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url create mode 100644 FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url create mode 100644 FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url new file mode 100644 index 000000000..8a5a8617e --- /dev/null +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url @@ -0,0 +1,5 @@ +[{000214A0-0000-0000-C000-000000000046}] +Prop3=19,11 +[InternetShortcut] +IDList= +URL=https://www.freertos.org/mqtt/ diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj index dda52fb5b..b92278318 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj @@ -198,10 +198,6 @@ - - - - diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters index 4edf58197..98cdff0ad 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters @@ -286,18 +286,6 @@ FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include - - FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include - - - FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include - - - FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include - - - FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include - FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include\types diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c index 213defd03..bedfd3e85 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c @@ -25,9 +25,9 @@ * 1 tab == 4 spaces! */ -/* - * TBD - */ + /*** + * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions. + ***/ /* Standard includes. */ #include @@ -102,10 +102,9 @@ static UBaseType_t ulNextRand; int main( void ) { - /* - * Instructions for using this project are provided on: - * TBD - */ + /*** + * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions. + ***/ /* Miscellaneous initialisation including preparing the logging and seeding the random number generator. */ diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c index f1c7a42a4..fb2eaca26 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c @@ -25,7 +25,6 @@ * 1 tab == 4 spaces! */ -//_RB_ Add link to docs here. /* Kernel includes. */ #include "FreeRTOS.h" diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url new file mode 100644 index 000000000..c00147b27 --- /dev/null +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url @@ -0,0 +1,6 @@ +[{000214A0-0000-0000-C000-000000000046}] +Prop3=19,11 +[InternetShortcut] +IDList= +URL=https://www.freertos.org/task-pool/ +HotKey=0 diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj index 4c7704c70..7f9596ad3 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj @@ -177,7 +177,7 @@ - + diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters index 4f426c818..3653a6cbe 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters @@ -250,11 +250,11 @@ - - FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform - FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\include\types + + FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform\types + \ No newline at end of file diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c index 7f42f9ab2..2cf6b3829 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c +++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c @@ -25,6 +25,11 @@ * 1 tab == 4 spaces! */ + /*** + * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions. + ***/ + + /* Standard includes. */ #include #include @@ -55,10 +60,9 @@ const uint8_t ucMACAddress[ 6 ] = { configMAC_ADDR0, configMAC_ADDR1, configMAC_ int main( void ) { - /* - * Instructions for using this project are provided on: - * TBD - */ + /*** + * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions. + ***/ /* Create the example that demonstrates task pool functionality. Examples that demonstrate networking connectivity will be added in future projects diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h new file mode 100644 index 000000000..d44593d8f --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h @@ -0,0 +1,547 @@ +/* + * FreeRTOS Kernel V10.2.0 + * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * http://www.FreeRTOS.org + * http://aws.amazon.com/freertos + * + * 1 tab == 4 spaces! + */ + +/** + * @file atomic.h + * @brief FreeRTOS atomic operation support. + * + * Two implementations of atomic are given in this header file: + * 1. Disabling interrupt globally. + * 2. ISA native atomic support. + * The former is available to all ports (compiler-architecture combination), + * while the latter is only available to ports compiling with GCC (version at + * least 4.7.0), which also have ISA atomic support. + * + * User can select which implementation to use by: + * setting/clearing configUSE_ATOMIC_INSTRUCTION in FreeRTOSConfig.h. + * Define AND set configUSE_ATOMIC_INSTRUCTION to 1 for ISA native atomic support. + * Undefine OR clear configUSE_ATOMIC_INSTRUCTION for disabling global interrupt + * implementation. + * + * @see GCC Built-in Functions for Memory Model Aware Atomic Operations + * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html + */ + +#ifndef ATOMIC_H +#define ATOMIC_H + +#ifndef INC_FREERTOS_H + #error "include FreeRTOS.h must appear in source files before include atomic.h" +#endif + +/* Standard includes. */ +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + /* Needed for __atomic_compare_exchange() weak=false. */ + #include + + /* This branch is for GCC compiler and GCC compiler only. */ + #ifndef portFORCE_INLINE + #define portFORCE_INLINE inline __attribute__((always_inline)) + #endif + +#else + + /* Port specific definitions -- entering/exiting critical section. + * Refer template -- ./lib/FreeRTOS/portable/Compiler/Arch/portmacro.h + * + * Every call to ATOMIC_EXIT_CRITICAL() must be closely paired with + * ATOMIC_ENTER_CRITICAL(). + */ + #if defined( portSET_INTERRUPT_MASK_FROM_ISR ) + + /* Nested interrupt scheme is supported in this port. */ + #define ATOMIC_ENTER_CRITICAL() \ + UBaseType_t uxCriticalSectionType = portSET_INTERRUPT_MASK_FROM_ISR() + + #define ATOMIC_EXIT_CRITICAL() \ + portCLEAR_INTERRUPT_MASK_FROM_ISR( uxCriticalSectionType ) + + #else + + /* Nested interrupt scheme is NOT supported in this port. */ + #define ATOMIC_ENTER_CRITICAL() portENTER_CRITICAL() + #define ATOMIC_EXIT_CRITICAL() portEXIT_CRITICAL() + + #endif /* portSET_INTERRUPT_MASK_FROM_ISR() */ + + /* Port specific definition -- "always inline". + * Inline is compiler specific, and may not always get inlined depending on your optimization level. + * Also, inline is considerred as performance optimization for atomic. + * Thus, if portFORCE_INLINE is not provided by portmacro.h, instead of resulting error, + * simply define it. + */ + #ifndef portFORCE_INLINE + #define portFORCE_INLINE + #endif + +#endif /* configUSE_GCC_BUILTIN_ATOMICS */ + +#define ATOMIC_COMPARE_AND_SWAP_SUCCESS 0x1U /**< Compare and swap succeeded, swapped. */ +#define ATOMIC_COMPARE_AND_SWAP_FAILURE 0x0U /**< Compare and swap failed, did not swap. */ + +/*----------------------------- Swap && CAS ------------------------------*/ + +/** + * Atomic compare-and-swap + * + * @brief Performs an atomic compare-and-swap operation on the specified values. + * + * @param[in, out] pDestination Pointer to memory location from where value is + * to be loaded and checked. + * @param[in] ulExchange If condition meets, write this value to memory. + * @param[in] ulComparand Swap condition. + * + * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped. + * + * @note This function only swaps *pDestination with ulExchange, if previous + * *pDestination value equals ulComparand. + */ +static portFORCE_INLINE uint32_t Atomic_CompareAndSwap_u32( + uint32_t volatile * pDestination, + uint32_t ulExchange, + uint32_t ulComparand ) +{ + + uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE; + +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + if ( __atomic_compare_exchange( pDestination, + &ulComparand, + &ulExchange, + false, + __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST ) ) + { + ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS; + } + +#else + + ATOMIC_ENTER_CRITICAL(); + + if ( *pDestination == ulComparand ) + { + *pDestination = ulExchange; + ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS; + } + + ATOMIC_EXIT_CRITICAL(); + +#endif + + return ulReturnValue; + +} + +/** + * Atomic swap (pointers) + * + * @brief Atomically sets the address pointed to by *ppDestination to the value + * of *pExchange. + * + * @param[in, out] ppDestination Pointer to memory location from where a pointer + * value is to be loaded and written back to. + * @param[in] pExchange Pointer value to be written to *ppDestination. + * + * @return The initial value of *ppDestination. + */ +static portFORCE_INLINE void * Atomic_SwapPointers_p32( + void * volatile * ppDestination, + void * pExchange ) +{ + void * pReturnValue; + +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + __atomic_exchange( ppDestination, &pExchange, &pReturnValue, __ATOMIC_SEQ_CST ); + +#else + + ATOMIC_ENTER_CRITICAL(); + + pReturnValue = *ppDestination; + + *ppDestination = pExchange; + + ATOMIC_EXIT_CRITICAL(); + +#endif + + return pReturnValue; +} + +/** + * Atomic compare-and-swap (pointers) + * + * @brief Performs an atomic compare-and-swap operation on the specified pointer + * values. + * + * @param[in, out] ppDestination Pointer to memory location from where a pointer + * value is to be loaded and checked. + * @param[in] pExchange If condition meets, write this value to memory. + * @param[in] pComparand Swap condition. + * + * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped. + * + * @note This function only swaps *ppDestination with pExchange, if previous + * *ppDestination value equals pComparand. + */ +static portFORCE_INLINE uint32_t Atomic_CompareAndSwapPointers_p32( + void * volatile * ppDestination, + void * pExchange, void * pComparand ) +{ + uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE; + +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + if ( __atomic_compare_exchange( ppDestination, + &pComparand, + &pExchange, + false, + __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST ) ) + { + ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS; + } + +#else + + ATOMIC_ENTER_CRITICAL(); + + if ( *ppDestination == pComparand ) + { + *ppDestination = pExchange; + ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS; + } + + ATOMIC_EXIT_CRITICAL(); + +#endif + + return ulReturnValue; +} + + +/*----------------------------- Arithmetic ------------------------------*/ + +/** + * Atomic add + * + * @brief Atomically adds count to the value of the specified pointer points to. + * + * @param[in,out] pAddend Pointer to memory location from where value is to be + * loaded and written back to. + * @param[in] ulCount Value to be added to *pAddend. + * + * @return previous *pAddend value. + */ +static portFORCE_INLINE uint32_t Atomic_Add_u32( + uint32_t volatile * pAddend, + uint32_t ulCount ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_add(pAddend, ulCount, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pAddend; + + *pAddend += ulCount; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/** + * Atomic subtract + * + * @brief Atomically subtracts count from the value of the specified pointer + * pointers to. + * + * @param[in,out] pAddend Pointer to memory location from where value is to be + * loaded and written back to. + * @param[in] ulCount Value to be subtract from *pAddend. + * + * @return previous *pAddend value. + */ +static portFORCE_INLINE uint32_t Atomic_Subtract_u32( + uint32_t volatile * pAddend, + uint32_t ulCount ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_sub(pAddend, ulCount, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pAddend; + + *pAddend -= ulCount; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/** + * Atomic increment + * + * @brief Atomically increments the value of the specified pointer points to. + * + * @param[in,out] pAddend Pointer to memory location from where value is to be + * loaded and written back to. + * + * @return *pAddend value before increment. + */ +static portFORCE_INLINE uint32_t Atomic_Increment_u32( uint32_t volatile * pAddend ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_add(pAddend, 1, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pAddend; + + *pAddend += 1; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/** + * Atomic decrement + * + * @brief Atomically decrements the value of the specified pointer points to + * + * @param[in,out] pAddend Pointer to memory location from where value is to be + * loaded and written back to. + * + * @return *pAddend value before decrement. + */ +static portFORCE_INLINE uint32_t Atomic_Decrement_u32( uint32_t volatile * pAddend ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_sub(pAddend, 1, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pAddend; + + *pAddend -= 1; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/*----------------------------- Bitwise Logical ------------------------------*/ + +/** + * Atomic OR + * + * @brief Performs an atomic OR operation on the specified values. + * + * @param [in, out] pDestination Pointer to memory location from where value is + * to be loaded and written back to. + * @param [in] ulValue Value to be ORed with *pDestination. + * + * @return The original value of *pDestination. + */ +static portFORCE_INLINE uint32_t Atomic_OR_u32( + uint32_t volatile * pDestination, + uint32_t ulValue ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_or(pDestination, ulValue, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pDestination; + + *pDestination |= ulValue; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/** + * Atomic AND + * + * @brief Performs an atomic AND operation on the specified values. + * + * @param [in, out] pDestination Pointer to memory location from where value is + * to be loaded and written back to. + * @param [in] ulValue Value to be ANDed with *pDestination. + * + * @return The original value of *pDestination. + */ +static portFORCE_INLINE uint32_t Atomic_AND_u32( + uint32_t volatile * pDestination, + uint32_t ulValue ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_and(pDestination, ulValue, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pDestination; + + *pDestination &= ulValue; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/** + * Atomic NAND + * + * @brief Performs an atomic NAND operation on the specified values. + * + * @param [in, out] pDestination Pointer to memory location from where value is + * to be loaded and written back to. + * @param [in] ulValue Value to be NANDed with *pDestination. + * + * @return The original value of *pDestination. + */ +static portFORCE_INLINE uint32_t Atomic_NAND_u32( + uint32_t volatile * pDestination, + uint32_t ulValue ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_nand(pDestination, ulValue, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pDestination; + + *pDestination = ~(ulCurrent & ulValue); + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +/** + * Atomic XOR + * + * @brief Performs an atomic XOR operation on the specified values. + * + * @param [in, out] pDestination Pointer to memory location from where value is + * to be loaded and written back to. + * @param [in] ulValue Value to be XORed with *pDestination. + * + * @return The original value of *pDestination. + */ +static portFORCE_INLINE uint32_t Atomic_XOR_u32( + uint32_t volatile * pDestination, + uint32_t ulValue ) +{ +#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 ) + + return __atomic_fetch_xor(pDestination, ulValue, __ATOMIC_SEQ_CST); + +#else + + uint32_t ulCurrent; + + ATOMIC_ENTER_CRITICAL(); + + ulCurrent = *pDestination; + + *pDestination ^= ulValue; + + ATOMIC_EXIT_CRITICAL(); + + return ulCurrent; + +#endif +} + +#ifdef __cplusplus +} +#endif + +#endif /* ATOMIC_H */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h index 327784a3e..dad106a68 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h @@ -427,7 +427,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection, size_t subscriptionCount, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pSubscribeOperation ); + IotMqttOperation_t * const pSubscribeOperation ); /* @[declare_mqtt_subscribe] */ /** @@ -512,7 +512,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection, size_t subscriptionCount, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pUnsubscribeOperation ); + IotMqttOperation_t * const pUnsubscribeOperation ); /* @[declare_mqtt_unsubscribe] */ /** @@ -642,7 +642,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection, const IotMqttPublishInfo_t * pPublishInfo, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pPublishOperation ); + IotMqttOperation_t * const pPublishOperation ); /* @[declare_mqtt_publish] */ /** @@ -817,7 +817,7 @@ const char * IotMqtt_OperationType( IotMqttOperationType_t operation ); bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection, const char * pTopicFilter, uint16_t topicFilterLength, - IotMqttSubscription_t * pCurrentSubscription ); + IotMqttSubscription_t * const pCurrentSubscription ); /* @[declare_mqtt_issubscribed] */ #endif /* ifndef IOT_MQTT_H_ */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c index 23133f46b..0d6a259ed 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c @@ -94,7 +94,7 @@ static void _mqttSubscription_tryDestroy( void * pData ); static void _mqttOperation_tryDestroy( void * pData ); /** - * @brief Create a keep-alive job for an MQTT connection. + * @brief Initialize the keep-alive operation for an MQTT connection. * * @param[in] pNetworkInfo User-provided network information for the new * connection. @@ -103,9 +103,9 @@ static void _mqttOperation_tryDestroy( void * pData ); * * @return `true` if the keep-alive job was successfully created; `false` otherwise. */ -static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo, - uint16_t keepAliveSeconds, - _mqttConnection_t * pMqttConnection ); +static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo, + uint16_t keepAliveSeconds, + _mqttConnection_t * pMqttConnection ); /** * @brief Creates a new MQTT connection and initializes its members. @@ -141,7 +141,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation, size_t subscriptionCount, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pOperationReference ); + IotMqttOperation_t * const pOperationReference ); /*-----------------------------------------------------------*/ @@ -238,9 +238,9 @@ static void _mqttOperation_tryDestroy( void * pData ) /*-----------------------------------------------------------*/ -static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo, - uint16_t keepAliveSeconds, - _mqttConnection_t * pMqttConnection ) +static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo, + uint16_t keepAliveSeconds, + _mqttConnection_t * pMqttConnection ) { bool status = true; IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS; @@ -253,9 +253,12 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo, IotMqttError_t ( * serializePingreq )( uint8_t **, size_t * ) = _IotMqtt_SerializePingreq; + /* Set PINGREQ operation members. */ + pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ; + /* Convert the keep-alive interval to milliseconds. */ - pMqttConnection->keepAliveMs = keepAliveSeconds * 1000; - pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs; + pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000; + pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000; /* Choose a PINGREQ serializer function. */ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 @@ -277,8 +280,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo, #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ /* Generate a PINGREQ packet. */ - serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ), - &( pMqttConnection->pingreqPacketSize ) ); + serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ), + &( pMqttConnection->pingreq.u.operation.packetSize ) ); if( serializeStatus != IOT_MQTT_SUCCESS ) { @@ -291,8 +294,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo, /* Create the task pool job that processes keep-alive. */ jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive, pMqttConnection, - &( pMqttConnection->keepAliveJobStorage ), - &( pMqttConnection->keepAliveJob ) ); + &( pMqttConnection->pingreq.jobStorage ), + &( pMqttConnection->pingreq.job ) ); /* Task pool job creation for a pre-allocated job should never fail. * Abort the program if it does. */ @@ -408,9 +411,9 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode, /* Check if keep-alive is active for this connection. */ if( keepAliveSeconds != 0 ) { - if( _createKeepAliveJob( pNetworkInfo, - keepAliveSeconds, - pMqttConnection ) == false ) + if( _createKeepAliveOperation( pNetworkInfo, + keepAliveSeconds, + pMqttConnection ) == false ) { IOT_SET_AND_GOTO_CLEANUP( false ); } @@ -471,17 +474,31 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection ) { IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS; + /* Default free packet function. */ + void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket; + /* Clean up keep-alive if still allocated. */ - if( pMqttConnection->keepAliveMs != 0 ) + if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 ) { IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection ); - _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket ); + /* Choose a function to free the packet. */ + #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 + if( pMqttConnection->pSerializer != NULL ) + { + if( pMqttConnection->pSerializer->freePacket != NULL ) + { + freePacket = pMqttConnection->pSerializer->freePacket; + } + } + #endif + + freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket ); /* Clear data about the keep-alive. */ - pMqttConnection->keepAliveMs = 0; - pMqttConnection->pPingreqPacket = NULL; - pMqttConnection->pingreqPacketSize = 0; + pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0; + pMqttConnection->pingreq.u.operation.pMqttPacket = NULL; + pMqttConnection->pingreq.u.operation.packetSize = 0; /* Decrement reference count. */ pMqttConnection->references--; @@ -494,9 +511,9 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection ) /* A connection to be destroyed should have no keep-alive and at most 1 * reference. */ IotMqtt_Assert( pMqttConnection->references <= 1 ); - IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 ); - IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL ); - IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 ); + IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 ); + IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL ); + IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 ); /* Remove all subscriptions. */ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) ); @@ -546,7 +563,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation, size_t subscriptionCount, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pOperationReference ) + IotMqttOperation_t * const pOperationReference ) { IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); _mqttOperation_t * pSubscriptionOperation = NULL; @@ -666,7 +683,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation, /* Check the subscription operation data and set the operation type. */ IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); - IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 ); + IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 ); pSubscriptionOperation->u.operation.type = operation; /* Generate a subscription packet from the subscription list. */ @@ -853,7 +870,7 @@ IotMqttError_t IotMqtt_Init( void ) #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ /* Log initialization status. */ - if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 + if( status != IOT_MQTT_SUCCESS ) { IotLogError( "Failed to initialize MQTT library serializer. " ); } @@ -896,7 +913,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, _mqttConnection_t * pNewMqttConnection = NULL; /* Default CONNECT serializer function. */ - IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change. + IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, uint8_t **, size_t * ) = _IotMqtt_SerializeConnect; @@ -911,7 +928,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, } /* Validate network interface and connect info. */ - if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts(). + if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) { IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); } @@ -1002,7 +1019,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, IotLogInfo( "Establishing new MQTT connection." ); - /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name? I don't think this does create a connection as that happens below. + /* Initialize a new MQTT connection object. */ pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode, pNetworkInfo, pConnectInfo->keepAliveSeconds ); @@ -1059,7 +1076,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ); - IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 ); + IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); /* Set the operation type. */ pOperation->u.operation.type = IOT_MQTT_CONNECT; @@ -1127,7 +1144,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); /* Add the CONNECT operation to the send queue for network transmission. */ - status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion? + status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessSend, 0 ); @@ -1150,13 +1167,13 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, if( status == IOT_MQTT_SUCCESS ) { /* Check if a keep-alive job should be scheduled. */ - if( pNewMqttConnection->keepAliveMs != 0 ) + if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 ) { IotLogDebug( "Scheduling first MQTT keep-alive job." ); taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL, - pNewMqttConnection->keepAliveJob, - pNewMqttConnection->nextKeepAliveMs ); + pNewMqttConnection->pingreq.job, + pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs ); if( taskPoolStatus != IOT_TASKPOOL_SUCCESS ) { @@ -1268,7 +1285,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection, IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ); - IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 ); + IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); /* Set the operation type. */ pOperation->u.operation.type = IOT_MQTT_DISCONNECT; @@ -1389,7 +1406,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection, size_t subscriptionCount, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pSubscribeOperation ) + IotMqttOperation_t * const pSubscribeOperation ) { return _subscriptionCommon( IOT_MQTT_SUBSCRIBE, mqttConnection, @@ -1445,7 +1462,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection, size_t subscriptionCount, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pUnsubscribeOperation ) + IotMqttOperation_t * const pUnsubscribeOperation ) { return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE, mqttConnection, @@ -1500,7 +1517,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection, const IotMqttPublishInfo_t * pPublishInfo, uint32_t flags, const IotMqttCallbackInfo_t * pCallbackInfo, - IotMqttOperation_t * pPublishOperation ) + IotMqttOperation_t * const pPublishOperation ) { IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); _mqttOperation_t * pOperation = NULL; @@ -1651,8 +1668,8 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection, /* A QoS 0 PUBLISH may not be retried. */ if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) { - pOperation->u.operation.retry.limit = pPublishInfo->retryLimit; - pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs; + pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit; + pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs; } else { diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c index 169a292df..42843d28c 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c @@ -43,6 +43,9 @@ /* Platform layer includes. */ #include "platform/iot_threads.h" +/* Atomics include. */ +#include "iot_atomic.h" + /*-----------------------------------------------------------*/ /** @@ -89,6 +92,22 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne static void _sendPuback( _mqttConnection_t * pMqttConnection, uint16_t packetIdentifier ); +/** + * @brief Flush a packet from the stream of incoming data. + * + * This function is called when memory for a packet cannot be allocated. The + * packet is flushed from the stream of incoming data so that the next packet + * may be read. + * + * @param[in] pNetworkConnection Network connection to use for receive, which + * may be different from the network connection associated with the MQTT connection. + * @param[in] pMqttConnection The associated MQTT connection. + * @param[in] length The length of the packet to flush. + */ +static void _flushPacket( void * pNetworkConnection, + const _mqttConnection_t * pMqttConnection, + size_t length ); + /*-----------------------------------------------------------*/ static bool _incomingPacketValid( uint8_t packetType ) @@ -201,6 +220,14 @@ static IotMqttError_t _getIncomingPacket( void * pNetworkConnection, if( pIncomingPacket->pRemainingData == NULL ) { + IotLogError( "(MQTT connection %p) Failed to allocate buffer of length " + "%lu for incoming packet type %lu.", + pMqttConnection, + ( unsigned long ) pIncomingPacket->remainingLength, + ( unsigned long ) pIncomingPacket->type ); + + _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength ); + IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY ); } else @@ -593,22 +620,18 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne if( status == IOT_MQTT_SUCCESS ) { - IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); - - if( pMqttConnection->keepAliveFailure == false ) + if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ), + 0, + 1 ) == 1 ) { - IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.", - pMqttConnection ); + IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.", + pMqttConnection ); } else { - IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.", - pMqttConnection ); - - pMqttConnection->keepAliveFailure = false; + IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.", + pMqttConnection ); } - - IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); } else { @@ -637,15 +660,13 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne static void _sendPuback( _mqttConnection_t * pMqttConnection, uint16_t packetIdentifier ) { - IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS; - uint8_t * pPuback = NULL; - size_t pubackSize = 0, bytesSent = 0; + IotMqttError_t status = IOT_MQTT_STATUS_PENDING; + _mqttOperation_t * pPubackOperation = NULL; - /* Default PUBACK serializer and free packet functions. */ + /* Default PUBACK serializer function. */ IotMqttError_t ( * serializePuback )( uint16_t, uint8_t **, size_t * ) = _IotMqtt_SerializePuback; - void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket; IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.", pMqttConnection, @@ -669,57 +690,82 @@ static void _sendPuback( _mqttConnection_t * pMqttConnection, EMPTY_ELSE_MARKER; } #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ - #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 - if( pMqttConnection->pSerializer != NULL ) - { - if( pMqttConnection->pSerializer->freePacket != NULL ) - { - freePacket = pMqttConnection->pSerializer->freePacket; - } - else - { - EMPTY_ELSE_MARKER; - } - } - else - { - EMPTY_ELSE_MARKER; - } - #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ + + /* Create a PUBACK operation. */ + status = _IotMqtt_CreateOperation( pMqttConnection, + 0, + NULL, + &pPubackOperation ); + + if( status != IOT_MQTT_SUCCESS ) + { + IOT_GOTO_CLEANUP(); + } + + /* Set the operation type. */ + pPubackOperation->u.operation.type = IOT_MQTT_PUBACK; /* Generate a PUBACK packet from the packet identifier. */ - serializeStatus = serializePuback( packetIdentifier, - &pPuback, - &pubackSize ); + status = serializePuback( packetIdentifier, + &( pPubackOperation->u.operation.pMqttPacket ), + &( pPubackOperation->u.operation.packetSize ) ); + + if( status != IOT_MQTT_SUCCESS ) + { + IOT_GOTO_CLEANUP(); + } + + /* Add the PUBACK operation to the send queue for network transmission. */ + status = _IotMqtt_ScheduleOperation( pPubackOperation, + _IotMqtt_ProcessSend, + 0 ); - if( serializeStatus != IOT_MQTT_SUCCESS ) + if( status != IOT_MQTT_SUCCESS ) { - IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for " - "received PUBLISH %hu.", - pMqttConnection, - packetIdentifier ); + IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.", + pMqttConnection ); + + IOT_GOTO_CLEANUP(); } else { - bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection, - pPuback, - pubackSize ); + EMPTY_ELSE_MARKER; + } + + /* Clean up on error. */ + IOT_FUNCTION_CLEANUP_BEGIN(); - if( bytesSent != pubackSize ) + if( status != IOT_MQTT_SUCCESS ) + { + if( pPubackOperation != NULL ) { - IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received" - " PUBLISH %hu.", - pMqttConnection, - packetIdentifier ); + _IotMqtt_DestroyOperation( pPubackOperation ); } else { - IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.", - pMqttConnection, - packetIdentifier ); + EMPTY_ELSE_MARKER; } + } + else + { + EMPTY_ELSE_MARKER; + } +} + +/*-----------------------------------------------------------*/ - freePacket( pPuback ); +static void _flushPacket( void * pNetworkConnection, + const _mqttConnection_t * pMqttConnection, + size_t length ) +{ + size_t bytesFlushed = 0; + uint8_t receivedByte = 0; + + for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ ) + { + ( void ) _IotMqtt_GetNextByte( pNetworkConnection, + pMqttConnection->pNetworkInterface, + &receivedByte ); } } @@ -761,17 +807,27 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS; IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } }; + void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL; + + /* Disconnect callback function. */ + void ( * disconnectCallback )( void *, + IotMqttCallbackParam_t * ) = NULL; + + /* Network close function. */ + IotNetworkError_t ( * closeConnection) ( void * ) = NULL; + + /* Default free packet function. */ + void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket; /* Mark the MQTT connection as disconnected and the keep-alive as failed. */ IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); pMqttConnection->disconnected = true; - pMqttConnection->keepAliveFailure = true; - if( pMqttConnection->keepAliveMs != 0 ) + if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 ) { /* Keep-alive must have a PINGREQ allocated. */ - IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL ); - IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 ); + IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL ); + IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 ); /* PINGREQ provides a reference to the connection, so reference count must * be nonzero. */ @@ -779,7 +835,7 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason /* Attempt to cancel the keep-alive job. */ taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, - pMqttConnection->keepAliveJob, + pMqttConnection->pingreq.job, NULL ); /* If the keep-alive job was not canceled, it must be already executing. @@ -791,13 +847,23 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason * the executing keep-alive job will clean up itself. */ if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) { - /* Clean up PINGREQ packet and job. */ - _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket ); + /* Choose a function to free the packet. */ + #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 + if( pMqttConnection->pSerializer != NULL ) + { + if( pMqttConnection->pSerializer->freePacket != NULL ) + { + freePacket = pMqttConnection->pSerializer->freePacket; + } + } + #endif + + freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket ); /* Clear data about the keep-alive. */ - pMqttConnection->keepAliveMs = 0; - pMqttConnection->pPingreqPacket = NULL; - pMqttConnection->pingreqPacketSize = 0; + pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0; + pMqttConnection->pingreq.u.operation.pMqttPacket = NULL; + pMqttConnection->pingreq.u.operation.packetSize = 0; /* Keep-alive is cleaned up; decrement reference count. Since this * function must be followed with a call to DISCONNECT, a check to @@ -817,12 +883,20 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason EMPTY_ELSE_MARKER; } + /* Copy the function pointers and contexts, as the MQTT connection may be + * modified after the mutex is released. */ + disconnectCallback = pMqttConnection->disconnectCallback.function; + pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext; + + closeConnection = pMqttConnection->pNetworkInterface->close; + pNetworkConnection = pMqttConnection->pNetworkConnection; + IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); /* Close the network connection. */ - if( pMqttConnection->pNetworkInterface->close != NULL ) + if( closeConnection != NULL ) { - closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection ); + closeStatus = closeConnection( pNetworkConnection ); if( closeStatus == IOT_NETWORK_SUCCESS ) { @@ -842,14 +916,14 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason } /* Invoke the disconnect callback. */ - if( pMqttConnection->disconnectCallback.function != NULL ) + if( disconnectCallback != NULL ) { /* Set the members of the callback parameter. */ callbackParam.mqttConnection = pMqttConnection; callbackParam.u.disconnectReason = disconnectReason; - pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext, - &callbackParam ); + disconnectCallback( pDisconnectCallbackContext, + &callbackParam ); } else { diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c index 9e4ec282e..af343f637 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c @@ -44,6 +44,9 @@ #include "platform/iot_clock.h" #include "platform/iot_threads.h" +/* Atomics include. */ +#include "iot_atomic.h" + /*-----------------------------------------------------------*/ /** @@ -133,7 +136,7 @@ static bool _mqttOperation_match( const IotLink_t * pOperationLink, static bool _checkRetryLimit( _mqttOperation_t * pOperation ) { _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; - bool status = true; + bool status = true, setDup = false; /* Choose a set DUP function. */ void ( * publishSetDup )( uint8_t *, @@ -162,36 +165,65 @@ static bool _checkRetryLimit( _mqttOperation_t * pOperation ) IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER ); /* Check if the retry limit is exhausted. */ - if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit ) + if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit ) { /* The retry count may be at most one more than the retry limit, which * accounts for the final check for a PUBACK. */ - IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 ); + IotMqtt_Assert( pOperation->u.operation.periodic.retry.count == + pOperation->u.operation.periodic.retry.limit + 1 ); IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.", pMqttConnection, pOperation, - pOperation->u.operation.retry.limit ); + pOperation->u.operation.periodic.retry.limit ); status = false; } - /* Check if this is the first retry. */ - else if( pOperation->u.operation.retry.count == 1 ) - { - /* Always set the DUP flag on the first retry. */ - publishSetDup( pOperation->u.operation.pMqttPacket, - pOperation->u.operation.pPacketIdentifierHigh, - &( pOperation->u.operation.packetIdentifier ) ); - } else { - /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet - * identifier) must be reset on every retry. */ - if( pMqttConnection->awsIotMqttMode == true ) + if( pOperation->u.operation.periodic.retry.count == 1 ) + { + /* The DUP flag should always be set on the first retry. */ + setDup = true; + } + else if( pMqttConnection->awsIotMqttMode == true ) { + /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet + * identifier) must be reset on every retry. */ + setDup = true; + } + else + { + EMPTY_ELSE_MARKER; + } + + if( setDup == true ) + { + /* In AWS IoT MQTT mode, the references mutex must be locked to + * prevent the packet identifier from being read while it is being + * changed. */ + if( pMqttConnection->awsIotMqttMode == true ) + { + IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); + } + else + { + EMPTY_ELSE_MARKER; + } + + /* Always set the DUP flag on the first retry. */ publishSetDup( pOperation->u.operation.pMqttPacket, pOperation->u.operation.pPacketIdentifierHigh, &( pOperation->u.operation.packetIdentifier ) ); + + if( pMqttConnection->awsIotMqttMode == true ) + { + IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); + } + else + { + EMPTY_ELSE_MARKER; + } } else { @@ -213,14 +245,16 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation ) /* This function should never be called with retry count greater than * retry limit. */ - IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit ); + IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <= + pOperation->u.operation.periodic.retry.limit ); /* Increment the retry count. */ - ( pOperation->u.operation.retry.count )++; + ( pOperation->u.operation.periodic.retry.count )++; /* Check for a response shortly for the final retry. Otherwise, calculate the * next retry period. */ - if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit ) + if( pOperation->u.operation.periodic.retry.count > + pOperation->u.operation.periodic.retry.limit ) { scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS; @@ -232,14 +266,14 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation ) } else { - scheduleDelay = pOperation->u.operation.retry.nextPeriod; + scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs; /* Double the retry period, subject to a ceiling value. */ - pOperation->u.operation.retry.nextPeriod *= 2; + pOperation->u.operation.periodic.retry.nextPeriodMs *= 2; - if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING ) + if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING ) { - pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING; + pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING; } else { @@ -249,12 +283,12 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation ) IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.", pMqttConnection, pOperation, - ( unsigned long ) pOperation->u.operation.retry.count, - ( unsigned long ) pOperation->u.operation.retry.limit, + ( unsigned long ) pOperation->u.operation.periodic.retry.count, + ( unsigned long ) pOperation->u.operation.periodic.retry.limit, ( unsigned long ) scheduleDelay ); /* Check if this is the first retry. */ - firstRetry = ( pOperation->u.operation.retry.count == 1 ); + firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 ); /* On the first retry, the PUBLISH will be moved from the pending processing * list to the pending responses list. Lock the connection references mutex @@ -332,7 +366,7 @@ IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection, { IotLogError( "Callback should not be set for a waitable operation." ); - return IOT_MQTT_BAD_PARAMETER; + IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); } else { @@ -509,8 +543,8 @@ bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation, pMqttConnection, IotMqtt_OperationType( pOperation->u.operation.type ), pOperation, - pOperation->u.operation.jobReference + 1, - pOperation->u.operation.jobReference ); + ( long ) ( pOperation->u.operation.jobReference + 1 ), + ( long ) ( pOperation->u.operation.jobReference ) ); /* The job reference count must be 0 or 1 after the decrement. */ IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) || @@ -649,23 +683,30 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, void * pContext ) { bool status = true; + uint32_t swapStatus = 0; IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; size_t bytesSent = 0; + /* Swap status is not checked when asserts are disabled. */ + ( void ) swapStatus; + /* Retrieve the MQTT connection from the context. */ _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext; + _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq ); /* Check parameters. */ /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */ - IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob ); + IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job ); /* Check that keep-alive interval is valid. The MQTT spec states its maximum * value is 65,535 seconds. */ - IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 ); + IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 ); /* Only two values are valid for the next keep alive job delay. */ - IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) || - ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) ); + IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs == + pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) || + ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs + == IOT_MQTT_RESPONSE_WAIT_MS ) ); IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection ); @@ -676,10 +717,9 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, &pKeepAliveJob ); IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS ); - IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); - /* Determine whether to send a PINGREQ or check for PINGRESP. */ - if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) + if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs == + pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) { IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection ); @@ -687,10 +727,10 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, * more important than other operations. Bypass the queue of jobs for * operations by directly sending the PINGREQ in this job. */ bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection, - pMqttConnection->pPingreqPacket, - pMqttConnection->pingreqPacketSize ); + pPingreqOperation->u.operation.pMqttPacket, + pPingreqOperation->u.operation.packetSize ); - if( bytesSent != pMqttConnection->pingreqPacketSize ) + if( bytesSent != pPingreqOperation->u.operation.packetSize ) { IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection ); status = false; @@ -699,10 +739,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, { /* Assume the keep-alive will fail. The network receive callback will * clear the failure flag upon receiving a PINGRESP. */ - pMqttConnection->keepAliveFailure = true; + swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), + 1, + 0 ); + IotMqtt_Assert( swapStatus == 1 ); /* Schedule a check for PINGRESP. */ - pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS; + pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS; IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.", pMqttConnection, @@ -713,12 +756,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, { IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection ); - if( pMqttConnection->keepAliveFailure == false ) + if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 ) { IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection ); /* PINGRESP was received. Schedule the next PINGREQ transmission. */ - pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs; + pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = + pPingreqOperation->u.operation.periodic.ping.keepAliveMs; } else { @@ -737,13 +781,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, { taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool, pKeepAliveJob, - pMqttConnection->nextKeepAliveMs ); + pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ); if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) { - IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.", + IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.", pMqttConnection, - IOT_MQTT_RESPONSE_WAIT_MS ); + ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ); } else { @@ -769,8 +813,6 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, { EMPTY_ELSE_MARKER; } - - IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); } /*-----------------------------------------------------------*/ @@ -799,6 +841,8 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool, } else { + /* This operation may have already been removed by cleanup of pending + * operations (called from Disconnect). In that case, do nothing here. */ EMPTY_ELSE_MARKER; } @@ -810,15 +854,9 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool, _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection, &callbackParam ); - /* Free any buffers associated with the current PUBLISH message. */ - if( pOperation->u.publish.pReceivedData != NULL ) - { - IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData ); - } - else - { - EMPTY_ELSE_MARKER; - } + /* Free buffers associated with the current PUBLISH message. */ + IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL ); + IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData ); /* Free the incoming PUBLISH operation. */ IotMqtt_FreeOperation( pOperation ); @@ -851,7 +889,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool, waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE; /* Check PUBLISH retry counts and limits. */ - if( pOperation->u.operation.retry.limit > 0 ) + if( pOperation->u.operation.periodic.retry.limit > 0 ) { if( _checkRetryLimit( pOperation ) == false ) { @@ -923,7 +961,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool, if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ) { /* Check if this operation should be scheduled for retransmission. */ - if( pOperation->u.operation.retry.limit > 0 ) + if( pOperation->u.operation.periodic.retry.limit > 0 ) { if( _scheduleNextRetry( pOperation ) == false ) { @@ -1141,7 +1179,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection, /* Check if the matched operation is a PUBLISH with retry. If it is, cancel * the retry job. */ - if( pResult->u.operation.retry.limit > 0 ) + if( pResult->u.operation.periodic.retry.limit > 0 ) { taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, pResult->job, @@ -1188,7 +1226,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection, if( pResult != NULL ) { - IotLogDebug( "(MQTT connection %p) Found operation %s." , + IotLogDebug( "(MQTT connection %p) Found operation %s.", pMqttConnection, IotMqtt_OperationType( type ) ); diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c index f42c80c6a..d3447eefd 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c @@ -1375,12 +1375,12 @@ IotMqttError_t _IotMqtt_DeserializePublish( _mqttPacket_t * pPublish ) * a packet identifer, but QoS 0 PUBLISH packets do not. */ if( pOutput->qos == IOT_MQTT_QOS_0 ) { - pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) ); + pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) ); pOutput->pPayload = pPacketIdentifierHigh; } else { - pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) ); + pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) ); pOutput->pPayload = pPacketIdentifierHigh + sizeof( uint16_t ); } diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c index 59caccc7f..f4546b558 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c @@ -586,7 +586,7 @@ void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnecti bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection, const char * pTopicFilter, uint16_t topicFilterLength, - IotMqttSubscription_t * pCurrentSubscription ) + IotMqttSubscription_t * const pCurrentSubscription ) { bool status = false; _mqttSubscription_t * pSubscription = NULL; @@ -595,7 +595,7 @@ bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection, topicMatchParams.pTopicName = pTopicFilter; topicMatchParams.topicNameLength = topicFilterLength; - topicMatchParams.exactMatchOnly = false; + topicMatchParams.exactMatchOnly = true; /* Prevent any other thread from modifying the subscription list while this * function is running. */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h index 80aef1c60..5ac1b9489 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h @@ -248,69 +248,13 @@ /*---------------------- MQTT internal data structures ----------------------*/ /** - * @brief Represents an MQTT connection. - */ -typedef struct _mqttConnection -{ - bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */ - bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */ - void * pNetworkConnection; /**< @brief References the transport-layer network connection. */ - const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */ - IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */ - - #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 - const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */ - #endif - - bool disconnected; /**< @brief Tracks if this connection has been disconnected. */ - IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */ - int32_t references; /**< @brief Counts callbacks and operations using this connection. */ - IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */ - IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */ - - IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */ - IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */ - - bool keepAliveFailure; /**< @brief Failure flag for keep-alive operation. */ - uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */ - uint32_t nextKeepAliveMs; /**< @brief Relative delay for next keep-alive job. */ - IotTaskPoolJobStorage_t keepAliveJobStorage; /**< @brief Task pool job for processing this connection's keep-alive. */ - IotTaskPoolJob_t keepAliveJob; /**< @brief Task pool job for processing this connection's keep-alive. */ - uint8_t * pPingreqPacket; /**< @brief An MQTT PINGREQ packet, allocated if keep-alive is active. */ - size_t pingreqPacketSize; /**< @brief The size of an allocated PINGREQ packet. */ -} _mqttConnection_t; - -/** - * @brief Represents a subscription stored in an MQTT connection. + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section. + * + * Forward declaration of MQTT connection type. */ -typedef struct _mqttSubscription -{ - IotLink_t link; /**< @brief List link member. */ - - int32_t references; /**< @brief How many subscription callbacks are using this subscription. */ - - /** - * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for - * this subscription. - * - * If there are active subscription callbacks, @ref mqtt_function_unsubscribe - * cannot remove this subscription. Instead, it will set this flag, which - * schedules the removal of this subscription once all subscription callbacks - * terminate. - */ - bool unsubscribed; - - struct - { - uint16_t identifier; /**< @brief Packet identifier. */ - size_t order; /**< @brief Order in the packet's list of subscriptions. */ - } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */ - - IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */ - - uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */ - char pTopicFilter[]; /**< @brief The subscription topic filter. */ -} _mqttSubscription_t; +struct _mqttConnection; +/** @endcond */ /** * @brief Internal structure representing a single MQTT operation, such as @@ -321,13 +265,13 @@ typedef struct _mqttSubscription typedef struct _mqttOperation { /* Pointers to neighboring queue elements. */ - IotLink_t link; /**< @brief List link member. */ + IotLink_t link; /**< @brief List link member. */ - bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */ - _mqttConnection_t * pMqttConnection; /**< @brief MQTT connection associated with this operation. */ + bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */ + struct _mqttConnection * pMqttConnection; /**< @brief MQTT connection associated with this operation. */ - IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */ - IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */ + IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */ + IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */ union { @@ -353,12 +297,22 @@ typedef struct _mqttOperation } notify; /**< @brief How to notify of this operation's completion. */ IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */ - struct + union { - uint32_t count; - uint32_t limit; - uint32_t nextPeriod; - } retry; + struct + { + uint32_t count; /**< @brief Current number of retries. */ + uint32_t limit; /**< @brief Maximum number of retries allowed. */ + uint32_t nextPeriodMs; /**< @brief Next retry period. */ + } retry; /**< @brief Additional information for PUBLISH retry. */ + + struct + { + uint32_t failure; /**< @brief Flag tracking keep-alive status. */ + uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */ + uint32_t nextPeriodMs; /**< @brief Relative delay for next keep-alive job. */ + } ping; /**< @brief Additional information for keep-alive pings. */ + } periodic; /**< @brief Additional information for periodic operations. */ } operation; /* If incomingPublish is true, this struct is valid. */ @@ -370,6 +324,65 @@ typedef struct _mqttOperation } u; /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */ } _mqttOperation_t; +/** + * @brief Represents an MQTT connection. + */ +typedef struct _mqttConnection +{ + bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */ + bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */ + void * pNetworkConnection; /**< @brief References the transport-layer network connection. */ + const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */ + IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */ + + #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 + const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */ + #endif + + bool disconnected; /**< @brief Tracks if this connection has been disconnected. */ + IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */ + int32_t references; /**< @brief Counts callbacks and operations using this connection. */ + IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */ + IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */ + + IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */ + IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */ + + _mqttOperation_t pingreq; /**< @brief Operation used for MQTT keep-alive. */ +} _mqttConnection_t; + +/** + * @brief Represents a subscription stored in an MQTT connection. + */ +typedef struct _mqttSubscription +{ + IotLink_t link; /**< @brief List link member. */ + + int32_t references; /**< @brief How many subscription callbacks are using this subscription. */ + + /** + * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for + * this subscription. + * + * If there are active subscription callbacks, @ref mqtt_function_unsubscribe + * cannot remove this subscription. Instead, it will set this flag, which + * schedules the removal of this subscription once all subscription callbacks + * terminate. + */ + bool unsubscribed; + + struct + { + uint16_t identifier; /**< @brief Packet identifier. */ + size_t order; /**< @brief Order in the packet's list of subscriptions. */ + } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */ + + IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */ + + uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */ + char pTopicFilter[]; /**< @brief The subscription topic filter. */ +} _mqttSubscription_t; + /** * @brief Represents an MQTT packet received from the network. * -- 2.39.2