--- /dev/null
+[{000214A0-0000-0000-C000-000000000046}]\r
+Prop3=19,11\r
+[InternetShortcut]\r
+IDList=\r
+URL=https://www.freertos.org/mqtt/\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_taskpool_internal.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\types\iot_taskpool_types.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h" />\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h" />\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h" />\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h" />\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOSIPConfigDefaults.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOS_ARP.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h">\r
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
</ClInclude>\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h">\r
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
- </ClInclude>\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h">\r
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
- </ClInclude>\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h">\r
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
- </ClInclude>\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h">\r
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
- </ClInclude>\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h">\r
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include\types</Filter>\r
</ClInclude>\r
* 1 tab == 4 spaces!\r
*/\r
\r
-/*\r
- * TBD\r
- */\r
+ /***\r
+ * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.\r
+ ***/\r
\r
/* Standard includes. */\r
#include <stdio.h>\r
\r
int main( void )\r
{\r
- /*\r
- * Instructions for using this project are provided on:\r
- * TBD\r
- */\r
+ /***\r
+ * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.\r
+ ***/\r
\r
/* Miscellaneous initialisation including preparing the logging and seeding\r
the random number generator. */\r
* 1 tab == 4 spaces!\r
*/\r
\r
-//_RB_ Add link to docs here.\r
\r
/* Kernel includes. */\r
#include "FreeRTOS.h"\r
--- /dev/null
+[{000214A0-0000-0000-C000-000000000046}]\r
+Prop3=19,11\r
+[InternetShortcut]\r
+IDList=\r
+URL=https://www.freertos.org/task-pool/\r
+HotKey=0\r
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\task.h" />\r
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\timers.h" />\r
<ClInclude Include="..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW\portmacro.h" />\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h" />\r
+ <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\iot_taskpool.h" />\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_error.h" />\r
</ClInclude>\r
<ClInclude Include="iot_config.h" />\r
<ClInclude Include="iot_config_common.h" />\r
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h">\r
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform</Filter>\r
- </ClInclude>\r
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h">\r
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\include\types</Filter>\r
</ClInclude>\r
+ <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h">\r
+ <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform\types</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
* 1 tab == 4 spaces!\r
*/\r
\r
+ /***\r
+ * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.\r
+ ***/\r
+\r
+\r
/* Standard includes. */\r
#include <stdio.h>\r
#include <time.h>\r
\r
int main( void )\r
{\r
- /*\r
- * Instructions for using this project are provided on:\r
- * TBD\r
- */\r
+ /***\r
+ * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.\r
+ ***/\r
\r
/* Create the example that demonstrates task pool functionality. Examples\r
that demonstrate networking connectivity will be added in future projects\r
--- /dev/null
+/*\r
+ * FreeRTOS Kernel V10.2.0\r
+ * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://www.FreeRTOS.org\r
+ * http://aws.amazon.com/freertos\r
+ *\r
+ * 1 tab == 4 spaces!\r
+ */\r
+\r
+/**\r
+ * @file atomic.h\r
+ * @brief FreeRTOS atomic operation support.\r
+ *\r
+ * Two implementations of atomic are given in this header file:\r
+ * 1. Disabling interrupt globally.\r
+ * 2. ISA native atomic support.\r
+ * The former is available to all ports (compiler-architecture combination),\r
+ * while the latter is only available to ports compiling with GCC (version at\r
+ * least 4.7.0), which also have ISA atomic support.\r
+ *\r
+ * User can select which implementation to use by:\r
+ * setting/clearing configUSE_ATOMIC_INSTRUCTION in FreeRTOSConfig.h.\r
+ * Define AND set configUSE_ATOMIC_INSTRUCTION to 1 for ISA native atomic support.\r
+ * Undefine OR clear configUSE_ATOMIC_INSTRUCTION for disabling global interrupt\r
+ * implementation.\r
+ *\r
+ * @see GCC Built-in Functions for Memory Model Aware Atomic Operations\r
+ * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html\r
+ */\r
+\r
+#ifndef ATOMIC_H\r
+#define ATOMIC_H\r
+\r
+#ifndef INC_FREERTOS_H\r
+ #error "include FreeRTOS.h must appear in source files before include atomic.h"\r
+#endif\r
+\r
+/* Standard includes. */\r
+#include <stdint.h>\r
+\r
+#ifdef __cplusplus\r
+extern "C" {\r
+#endif\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ /* Needed for __atomic_compare_exchange() weak=false. */\r
+ #include <stdbool.h>\r
+\r
+ /* This branch is for GCC compiler and GCC compiler only. */\r
+ #ifndef portFORCE_INLINE\r
+ #define portFORCE_INLINE inline __attribute__((always_inline))\r
+ #endif\r
+\r
+#else\r
+\r
+ /* Port specific definitions -- entering/exiting critical section.\r
+ * Refer template -- ./lib/FreeRTOS/portable/Compiler/Arch/portmacro.h\r
+ *\r
+ * Every call to ATOMIC_EXIT_CRITICAL() must be closely paired with\r
+ * ATOMIC_ENTER_CRITICAL().\r
+ */\r
+ #if defined( portSET_INTERRUPT_MASK_FROM_ISR )\r
+\r
+ /* Nested interrupt scheme is supported in this port. */\r
+ #define ATOMIC_ENTER_CRITICAL() \\r
+ UBaseType_t uxCriticalSectionType = portSET_INTERRUPT_MASK_FROM_ISR()\r
+\r
+ #define ATOMIC_EXIT_CRITICAL() \\r
+ portCLEAR_INTERRUPT_MASK_FROM_ISR( uxCriticalSectionType )\r
+\r
+ #else\r
+\r
+ /* Nested interrupt scheme is NOT supported in this port. */\r
+ #define ATOMIC_ENTER_CRITICAL() portENTER_CRITICAL()\r
+ #define ATOMIC_EXIT_CRITICAL() portEXIT_CRITICAL()\r
+\r
+ #endif /* portSET_INTERRUPT_MASK_FROM_ISR() */\r
+\r
+ /* Port specific definition -- "always inline". \r
+ * Inline is compiler specific, and may not always get inlined depending on your optimization level. \r
+ * Also, inline is considerred as performance optimization for atomic. \r
+ * Thus, if portFORCE_INLINE is not provided by portmacro.h, instead of resulting error,\r
+ * simply define it. \r
+ */\r
+ #ifndef portFORCE_INLINE\r
+ #define portFORCE_INLINE \r
+ #endif\r
+\r
+#endif /* configUSE_GCC_BUILTIN_ATOMICS */\r
+\r
+#define ATOMIC_COMPARE_AND_SWAP_SUCCESS 0x1U /**< Compare and swap succeeded, swapped. */\r
+#define ATOMIC_COMPARE_AND_SWAP_FAILURE 0x0U /**< Compare and swap failed, did not swap. */\r
+\r
+/*----------------------------- Swap && CAS ------------------------------*/\r
+\r
+/**\r
+ * Atomic compare-and-swap\r
+ *\r
+ * @brief Performs an atomic compare-and-swap operation on the specified values.\r
+ *\r
+ * @param[in, out] pDestination Pointer to memory location from where value is\r
+ * to be loaded and checked.\r
+ * @param[in] ulExchange If condition meets, write this value to memory.\r
+ * @param[in] ulComparand Swap condition.\r
+ *\r
+ * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.\r
+ *\r
+ * @note This function only swaps *pDestination with ulExchange, if previous\r
+ * *pDestination value equals ulComparand.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_CompareAndSwap_u32(\r
+ uint32_t volatile * pDestination,\r
+ uint32_t ulExchange,\r
+ uint32_t ulComparand )\r
+{\r
+\r
+ uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ if ( __atomic_compare_exchange( pDestination,\r
+ &ulComparand,\r
+ &ulExchange,\r
+ false,\r
+ __ATOMIC_SEQ_CST,\r
+ __ATOMIC_SEQ_CST ) )\r
+ {\r
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+ }\r
+\r
+#else\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ if ( *pDestination == ulComparand )\r
+ {\r
+ *pDestination = ulExchange;\r
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+ }\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+#endif\r
+\r
+ return ulReturnValue;\r
+\r
+}\r
+\r
+/**\r
+ * Atomic swap (pointers)\r
+ *\r
+ * @brief Atomically sets the address pointed to by *ppDestination to the value\r
+ * of *pExchange.\r
+ *\r
+ * @param[in, out] ppDestination Pointer to memory location from where a pointer\r
+ * value is to be loaded and written back to.\r
+ * @param[in] pExchange Pointer value to be written to *ppDestination.\r
+ *\r
+ * @return The initial value of *ppDestination.\r
+ */\r
+static portFORCE_INLINE void * Atomic_SwapPointers_p32(\r
+ void * volatile * ppDestination,\r
+ void * pExchange )\r
+{\r
+ void * pReturnValue;\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ __atomic_exchange( ppDestination, &pExchange, &pReturnValue, __ATOMIC_SEQ_CST );\r
+\r
+#else\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ pReturnValue = *ppDestination;\r
+\r
+ *ppDestination = pExchange;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+#endif\r
+\r
+ return pReturnValue;\r
+}\r
+\r
+/**\r
+ * Atomic compare-and-swap (pointers)\r
+ *\r
+ * @brief Performs an atomic compare-and-swap operation on the specified pointer\r
+ * values.\r
+ *\r
+ * @param[in, out] ppDestination Pointer to memory location from where a pointer\r
+ * value is to be loaded and checked.\r
+ * @param[in] pExchange If condition meets, write this value to memory.\r
+ * @param[in] pComparand Swap condition.\r
+ *\r
+ * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.\r
+ *\r
+ * @note This function only swaps *ppDestination with pExchange, if previous\r
+ * *ppDestination value equals pComparand.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_CompareAndSwapPointers_p32(\r
+ void * volatile * ppDestination,\r
+ void * pExchange, void * pComparand )\r
+{\r
+ uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+ if ( __atomic_compare_exchange( ppDestination,\r
+ &pComparand,\r
+ &pExchange,\r
+ false,\r
+ __ATOMIC_SEQ_CST,\r
+ __ATOMIC_SEQ_CST ) )\r
+ {\r
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+ }\r
+\r
+#else\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ if ( *ppDestination == pComparand )\r
+ {\r
+ *ppDestination = pExchange;\r
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+ }\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+#endif\r
+\r
+ return ulReturnValue;\r
+}\r
+\r
+\r
+/*----------------------------- Arithmetic ------------------------------*/\r
+\r
+/**\r
+ * Atomic add\r
+ *\r
+ * @brief Atomically adds count to the value of the specified pointer points to.\r
+ *\r
+ * @param[in,out] pAddend Pointer to memory location from where value is to be\r
+ * loaded and written back to.\r
+ * @param[in] ulCount Value to be added to *pAddend.\r
+ *\r
+ * @return previous *pAddend value.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Add_u32(\r
+ uint32_t volatile * pAddend,\r
+ uint32_t ulCount )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_add(pAddend, ulCount, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pAddend;\r
+\r
+ *pAddend += ulCount;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic subtract\r
+ *\r
+ * @brief Atomically subtracts count from the value of the specified pointer\r
+ * pointers to.\r
+ *\r
+ * @param[in,out] pAddend Pointer to memory location from where value is to be\r
+ * loaded and written back to.\r
+ * @param[in] ulCount Value to be subtract from *pAddend.\r
+ *\r
+ * @return previous *pAddend value.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Subtract_u32(\r
+ uint32_t volatile * pAddend,\r
+ uint32_t ulCount )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_sub(pAddend, ulCount, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pAddend;\r
+\r
+ *pAddend -= ulCount;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic increment\r
+ *\r
+ * @brief Atomically increments the value of the specified pointer points to.\r
+ *\r
+ * @param[in,out] pAddend Pointer to memory location from where value is to be\r
+ * loaded and written back to.\r
+ *\r
+ * @return *pAddend value before increment.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Increment_u32( uint32_t volatile * pAddend )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_add(pAddend, 1, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pAddend;\r
+\r
+ *pAddend += 1;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic decrement\r
+ *\r
+ * @brief Atomically decrements the value of the specified pointer points to\r
+ *\r
+ * @param[in,out] pAddend Pointer to memory location from where value is to be\r
+ * loaded and written back to.\r
+ *\r
+ * @return *pAddend value before decrement.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Decrement_u32( uint32_t volatile * pAddend )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_sub(pAddend, 1, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pAddend;\r
+\r
+ *pAddend -= 1;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/*----------------------------- Bitwise Logical ------------------------------*/\r
+\r
+/**\r
+ * Atomic OR\r
+ *\r
+ * @brief Performs an atomic OR operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination Pointer to memory location from where value is\r
+ * to be loaded and written back to.\r
+ * @param [in] ulValue Value to be ORed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_OR_u32(\r
+ uint32_t volatile * pDestination,\r
+ uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_or(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pDestination;\r
+\r
+ *pDestination |= ulValue;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic AND\r
+ *\r
+ * @brief Performs an atomic AND operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination Pointer to memory location from where value is\r
+ * to be loaded and written back to.\r
+ * @param [in] ulValue Value to be ANDed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_AND_u32(\r
+ uint32_t volatile * pDestination,\r
+ uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_and(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pDestination;\r
+\r
+ *pDestination &= ulValue;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic NAND\r
+ *\r
+ * @brief Performs an atomic NAND operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination Pointer to memory location from where value is\r
+ * to be loaded and written back to.\r
+ * @param [in] ulValue Value to be NANDed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_NAND_u32(\r
+ uint32_t volatile * pDestination,\r
+ uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_nand(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pDestination;\r
+\r
+ *pDestination = ~(ulCurrent & ulValue);\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic XOR\r
+ *\r
+ * @brief Performs an atomic XOR operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination Pointer to memory location from where value is\r
+ * to be loaded and written back to.\r
+ * @param [in] ulValue Value to be XORed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_XOR_u32(\r
+ uint32_t volatile * pDestination,\r
+ uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+ return __atomic_fetch_xor(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+ uint32_t ulCurrent;\r
+\r
+ ATOMIC_ENTER_CRITICAL();\r
+\r
+ ulCurrent = *pDestination;\r
+\r
+ *pDestination ^= ulValue;\r
+\r
+ ATOMIC_EXIT_CRITICAL();\r
+\r
+ return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+#ifdef __cplusplus\r
+}\r
+#endif\r
+\r
+#endif /* ATOMIC_H */\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pSubscribeOperation );\r
+ IotMqttOperation_t * const pSubscribeOperation );\r
/* @[declare_mqtt_subscribe] */\r
\r
/**\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pUnsubscribeOperation );\r
+ IotMqttOperation_t * const pUnsubscribeOperation );\r
/* @[declare_mqtt_unsubscribe] */\r
\r
/**\r
const IotMqttPublishInfo_t * pPublishInfo,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pPublishOperation );\r
+ IotMqttOperation_t * const pPublishOperation );\r
/* @[declare_mqtt_publish] */\r
\r
/**\r
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
const char * pTopicFilter,\r
uint16_t topicFilterLength,\r
- IotMqttSubscription_t * pCurrentSubscription );\r
+ IotMqttSubscription_t * const pCurrentSubscription );\r
/* @[declare_mqtt_issubscribed] */\r
\r
#endif /* ifndef IOT_MQTT_H_ */\r
static void _mqttOperation_tryDestroy( void * pData );\r
\r
/**\r
- * @brief Create a keep-alive job for an MQTT connection.\r
+ * @brief Initialize the keep-alive operation for an MQTT connection.\r
*\r
* @param[in] pNetworkInfo User-provided network information for the new\r
* connection.\r
*\r
* @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
*/\r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
- uint16_t keepAliveSeconds,\r
- _mqttConnection_t * pMqttConnection );\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds,\r
+ _mqttConnection_t * pMqttConnection );\r
\r
/**\r
* @brief Creates a new MQTT connection and initializes its members.\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pOperationReference );\r
+ IotMqttOperation_t * const pOperationReference );\r
\r
/*-----------------------------------------------------------*/\r
\r
\r
/*-----------------------------------------------------------*/\r
\r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
- uint16_t keepAliveSeconds,\r
- _mqttConnection_t * pMqttConnection )\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds,\r
+ _mqttConnection_t * pMqttConnection )\r
{\r
bool status = true;\r
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
IotMqttError_t ( * serializePingreq )( uint8_t **,\r
size_t * ) = _IotMqtt_SerializePingreq;\r
\r
+ /* Set PINGREQ operation members. */\r
+ pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;\r
+\r
/* Convert the keep-alive interval to milliseconds. */\r
- pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
- pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;\r
\r
/* Choose a PINGREQ serializer function. */\r
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
\r
/* Generate a PINGREQ packet. */\r
- serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
- &( pMqttConnection->pingreqPacketSize ) );\r
+ serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),\r
+ &( pMqttConnection->pingreq.u.operation.packetSize ) );\r
\r
if( serializeStatus != IOT_MQTT_SUCCESS )\r
{\r
/* Create the task pool job that processes keep-alive. */\r
jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
pMqttConnection,\r
- &( pMqttConnection->keepAliveJobStorage ),\r
- &( pMqttConnection->keepAliveJob ) );\r
+ &( pMqttConnection->pingreq.jobStorage ),\r
+ &( pMqttConnection->pingreq.job ) );\r
\r
/* Task pool job creation for a pre-allocated job should never fail.\r
* Abort the program if it does. */\r
/* Check if keep-alive is active for this connection. */\r
if( keepAliveSeconds != 0 )\r
{\r
- if( _createKeepAliveJob( pNetworkInfo,\r
- keepAliveSeconds,\r
- pMqttConnection ) == false )\r
+ if( _createKeepAliveOperation( pNetworkInfo,\r
+ keepAliveSeconds,\r
+ pMqttConnection ) == false )\r
{\r
IOT_SET_AND_GOTO_CLEANUP( false );\r
}\r
{\r
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
\r
+ /* Default free packet function. */\r
+ void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;\r
+\r
/* Clean up keep-alive if still allocated. */\r
- if( pMqttConnection->keepAliveMs != 0 )\r
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
{\r
IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
\r
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+ /* Choose a function to free the packet. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pMqttConnection->pSerializer->freePacket != NULL )\r
+ {\r
+ freePacket = pMqttConnection->pSerializer->freePacket;\r
+ }\r
+ }\r
+ #endif\r
+\r
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
\r
/* Clear data about the keep-alive. */\r
- pMqttConnection->keepAliveMs = 0;\r
- pMqttConnection->pPingreqPacket = NULL;\r
- pMqttConnection->pingreqPacketSize = 0;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+ pMqttConnection->pingreq.u.operation.packetSize = 0;\r
\r
/* Decrement reference count. */\r
pMqttConnection->references--;\r
/* A connection to be destroyed should have no keep-alive and at most 1\r
* reference. */\r
IotMqtt_Assert( pMqttConnection->references <= 1 );\r
- IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
- IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );\r
\r
/* Remove all subscriptions. */\r
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pOperationReference )\r
+ IotMqttOperation_t * const pOperationReference )\r
{\r
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
_mqttOperation_t * pSubscriptionOperation = NULL;\r
\r
/* Check the subscription operation data and set the operation type. */\r
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
- IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );\r
pSubscriptionOperation->u.operation.type = operation;\r
\r
/* Generate a subscription packet from the subscription list. */\r
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
\r
/* Log initialization status. */\r
- if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0\r
+ if( status != IOT_MQTT_SUCCESS )\r
{\r
IotLogError( "Failed to initialize MQTT library serializer. " );\r
}\r
_mqttConnection_t * pNewMqttConnection = NULL;\r
\r
/* Default CONNECT serializer function. */\r
- IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.\r
+ IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,\r
uint8_t **,\r
size_t * ) = _IotMqtt_SerializeConnect;\r
\r
}\r
\r
/* Validate network interface and connect info. */\r
- if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().\r
+ if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )\r
{\r
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
}\r
\r
IotLogInfo( "Establishing new MQTT connection." );\r
\r
- /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name? I don't think this does create a connection as that happens below.\r
+ /* Initialize a new MQTT connection object. */\r
pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
pNetworkInfo,\r
pConnectInfo->keepAliveSeconds );\r
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
== IOT_MQTT_FLAG_WAITABLE );\r
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
\r
/* Set the operation type. */\r
pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
\r
/* Add the CONNECT operation to the send queue for network transmission. */\r
- status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?\r
+ status = _IotMqtt_ScheduleOperation( pOperation,\r
_IotMqtt_ProcessSend,\r
0 );\r
\r
if( status == IOT_MQTT_SUCCESS )\r
{\r
/* Check if a keep-alive job should be scheduled. */\r
- if( pNewMqttConnection->keepAliveMs != 0 )\r
+ if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
{\r
IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
\r
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
- pNewMqttConnection->keepAliveJob,\r
- pNewMqttConnection->nextKeepAliveMs );\r
+ pNewMqttConnection->pingreq.job,\r
+ pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );\r
\r
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
{\r
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
== IOT_MQTT_FLAG_WAITABLE );\r
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
\r
/* Set the operation type. */\r
pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pSubscribeOperation )\r
+ IotMqttOperation_t * const pSubscribeOperation )\r
{\r
return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
mqttConnection,\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pUnsubscribeOperation )\r
+ IotMqttOperation_t * const pUnsubscribeOperation )\r
{\r
return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
mqttConnection,\r
const IotMqttPublishInfo_t * pPublishInfo,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pPublishOperation )\r
+ IotMqttOperation_t * const pPublishOperation )\r
{\r
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
_mqttOperation_t * pOperation = NULL;\r
/* A QoS 0 PUBLISH may not be retried. */\r
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
{\r
- pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
- pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
+ pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;\r
+ pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;\r
}\r
else\r
{\r
/* Platform layer includes. */\r
#include "platform/iot_threads.h"\r
\r
+/* Atomics include. */\r
+#include "iot_atomic.h"\r
+\r
/*-----------------------------------------------------------*/\r
\r
/**\r
static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
uint16_t packetIdentifier );\r
\r
+/**\r
+ * @brief Flush a packet from the stream of incoming data.\r
+ *\r
+ * This function is called when memory for a packet cannot be allocated. The\r
+ * packet is flushed from the stream of incoming data so that the next packet\r
+ * may be read.\r
+ *\r
+ * @param[in] pNetworkConnection Network connection to use for receive, which\r
+ * may be different from the network connection associated with the MQTT connection.\r
+ * @param[in] pMqttConnection The associated MQTT connection.\r
+ * @param[in] length The length of the packet to flush.\r
+ */\r
+static void _flushPacket( void * pNetworkConnection,\r
+ const _mqttConnection_t * pMqttConnection,\r
+ size_t length );\r
+\r
/*-----------------------------------------------------------*/\r
\r
static bool _incomingPacketValid( uint8_t packetType )\r
\r
if( pIncomingPacket->pRemainingData == NULL )\r
{\r
+ IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "\r
+ "%lu for incoming packet type %lu.",\r
+ pMqttConnection,\r
+ ( unsigned long ) pIncomingPacket->remainingLength,\r
+ ( unsigned long ) pIncomingPacket->type );\r
+\r
+ _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );\r
+\r
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
}\r
else\r
\r
if( status == IOT_MQTT_SUCCESS )\r
{\r
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
-\r
- if( pMqttConnection->keepAliveFailure == false )\r
+ if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),\r
+ 0,\r
+ 1 ) == 1 )\r
{\r
- IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
- pMqttConnection );\r
+ IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
+ pMqttConnection );\r
}\r
else\r
{\r
- IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
- pMqttConnection );\r
-\r
- pMqttConnection->keepAliveFailure = false;\r
+ IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
+ pMqttConnection );\r
}\r
-\r
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
}\r
else\r
{\r
static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
uint16_t packetIdentifier )\r
{\r
- IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
- uint8_t * pPuback = NULL;\r
- size_t pubackSize = 0, bytesSent = 0;\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ _mqttOperation_t * pPubackOperation = NULL;\r
\r
- /* Default PUBACK serializer and free packet functions. */\r
+ /* Default PUBACK serializer function. */\r
IotMqttError_t ( * serializePuback )( uint16_t,\r
uint8_t **,\r
size_t * ) = _IotMqtt_SerializePuback;\r
- void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
\r
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",\r
pMqttConnection,\r
EMPTY_ELSE_MARKER;\r
}\r
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
- if( pMqttConnection->pSerializer != NULL )\r
- {\r
- if( pMqttConnection->pSerializer->freePacket != NULL )\r
- {\r
- freePacket = pMqttConnection->pSerializer->freePacket;\r
- }\r
- else\r
- {\r
- EMPTY_ELSE_MARKER;\r
- }\r
- }\r
- else\r
- {\r
- EMPTY_ELSE_MARKER;\r
- }\r
- #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Create a PUBACK operation. */\r
+ status = _IotMqtt_CreateOperation( pMqttConnection,\r
+ 0,\r
+ NULL,\r
+ &pPubackOperation );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Set the operation type. */\r
+ pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;\r
\r
/* Generate a PUBACK packet from the packet identifier. */\r
- serializeStatus = serializePuback( packetIdentifier,\r
- &pPuback,\r
- &pubackSize );\r
+ status = serializePuback( packetIdentifier,\r
+ &( pPubackOperation->u.operation.pMqttPacket ),\r
+ &( pPubackOperation->u.operation.packetSize ) );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Add the PUBACK operation to the send queue for network transmission. */\r
+ status = _IotMqtt_ScheduleOperation( pPubackOperation,\r
+ _IotMqtt_ProcessSend,\r
+ 0 );\r
\r
- if( serializeStatus != IOT_MQTT_SUCCESS )\r
+ if( status != IOT_MQTT_SUCCESS )\r
{\r
- IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "\r
- "received PUBLISH %hu.",\r
- pMqttConnection,\r
- packetIdentifier );\r
+ IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",\r
+ pMqttConnection );\r
+\r
+ IOT_GOTO_CLEANUP();\r
}\r
else\r
{\r
- bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
- pPuback,\r
- pubackSize );\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Clean up on error. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
\r
- if( bytesSent != pubackSize )\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ if( pPubackOperation != NULL )\r
{\r
- IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"\r
- " PUBLISH %hu.",\r
- pMqttConnection,\r
- packetIdentifier );\r
+ _IotMqtt_DestroyOperation( pPubackOperation );\r
}\r
else\r
{\r
- IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",\r
- pMqttConnection,\r
- packetIdentifier );\r
+ EMPTY_ELSE_MARKER;\r
}\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
\r
- freePacket( pPuback );\r
+static void _flushPacket( void * pNetworkConnection,\r
+ const _mqttConnection_t * pMqttConnection,\r
+ size_t length )\r
+{\r
+ size_t bytesFlushed = 0;\r
+ uint8_t receivedByte = 0;\r
+\r
+ for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )\r
+ {\r
+ ( void ) _IotMqtt_GetNextByte( pNetworkConnection,\r
+ pMqttConnection->pNetworkInterface,\r
+ &receivedByte );\r
}\r
}\r
\r
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;\r
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };\r
+ void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;\r
+\r
+ /* Disconnect callback function. */\r
+ void ( * disconnectCallback )( void *,\r
+ IotMqttCallbackParam_t * ) = NULL;\r
+\r
+ /* Network close function. */\r
+ IotNetworkError_t ( * closeConnection) ( void * ) = NULL;\r
+\r
+ /* Default free packet function. */\r
+ void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
\r
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */\r
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
pMqttConnection->disconnected = true;\r
- pMqttConnection->keepAliveFailure = true;\r
\r
- if( pMqttConnection->keepAliveMs != 0 )\r
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
{\r
/* Keep-alive must have a PINGREQ allocated. */\r
- IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );\r
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );\r
\r
/* PINGREQ provides a reference to the connection, so reference count must\r
* be nonzero. */\r
\r
/* Attempt to cancel the keep-alive job. */\r
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
- pMqttConnection->keepAliveJob,\r
+ pMqttConnection->pingreq.job,\r
NULL );\r
\r
/* If the keep-alive job was not canceled, it must be already executing.\r
* the executing keep-alive job will clean up itself. */\r
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
{\r
- /* Clean up PINGREQ packet and job. */\r
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+ /* Choose a function to free the packet. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pMqttConnection->pSerializer->freePacket != NULL )\r
+ {\r
+ freePacket = pMqttConnection->pSerializer->freePacket;\r
+ }\r
+ }\r
+ #endif\r
+\r
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
\r
/* Clear data about the keep-alive. */\r
- pMqttConnection->keepAliveMs = 0;\r
- pMqttConnection->pPingreqPacket = NULL;\r
- pMqttConnection->pingreqPacketSize = 0;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+ pMqttConnection->pingreq.u.operation.packetSize = 0;\r
\r
/* Keep-alive is cleaned up; decrement reference count. Since this\r
* function must be followed with a call to DISCONNECT, a check to\r
EMPTY_ELSE_MARKER;\r
}\r
\r
+ /* Copy the function pointers and contexts, as the MQTT connection may be\r
+ * modified after the mutex is released. */\r
+ disconnectCallback = pMqttConnection->disconnectCallback.function;\r
+ pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;\r
+\r
+ closeConnection = pMqttConnection->pNetworkInterface->close;\r
+ pNetworkConnection = pMqttConnection->pNetworkConnection;\r
+\r
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
\r
/* Close the network connection. */\r
- if( pMqttConnection->pNetworkInterface->close != NULL )\r
+ if( closeConnection != NULL )\r
{\r
- closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );\r
+ closeStatus = closeConnection( pNetworkConnection );\r
\r
if( closeStatus == IOT_NETWORK_SUCCESS )\r
{\r
}\r
\r
/* Invoke the disconnect callback. */\r
- if( pMqttConnection->disconnectCallback.function != NULL )\r
+ if( disconnectCallback != NULL )\r
{\r
/* Set the members of the callback parameter. */\r
callbackParam.mqttConnection = pMqttConnection;\r
callbackParam.u.disconnectReason = disconnectReason;\r
\r
- pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,\r
- &callbackParam );\r
+ disconnectCallback( pDisconnectCallbackContext,\r
+ &callbackParam );\r
}\r
else\r
{\r
#include "platform/iot_clock.h"\r
#include "platform/iot_threads.h"\r
\r
+/* Atomics include. */\r
+#include "iot_atomic.h"\r
+\r
/*-----------------------------------------------------------*/\r
\r
/**\r
static bool _checkRetryLimit( _mqttOperation_t * pOperation )\r
{\r
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
- bool status = true;\r
+ bool status = true, setDup = false;\r
\r
/* Choose a set DUP function. */\r
void ( * publishSetDup )( uint8_t *,\r
IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );\r
\r
/* Check if the retry limit is exhausted. */\r
- if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+ if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )\r
{\r
/* The retry count may be at most one more than the retry limit, which\r
* accounts for the final check for a PUBACK. */\r
- IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );\r
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==\r
+ pOperation->u.operation.periodic.retry.limit + 1 );\r
\r
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",\r
pMqttConnection,\r
pOperation,\r
- pOperation->u.operation.retry.limit );\r
+ pOperation->u.operation.periodic.retry.limit );\r
\r
status = false;\r
}\r
- /* Check if this is the first retry. */\r
- else if( pOperation->u.operation.retry.count == 1 )\r
- {\r
- /* Always set the DUP flag on the first retry. */\r
- publishSetDup( pOperation->u.operation.pMqttPacket,\r
- pOperation->u.operation.pPacketIdentifierHigh,\r
- &( pOperation->u.operation.packetIdentifier ) );\r
- }\r
else\r
{\r
- /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
- * identifier) must be reset on every retry. */\r
- if( pMqttConnection->awsIotMqttMode == true )\r
+ if( pOperation->u.operation.periodic.retry.count == 1 )\r
+ {\r
+ /* The DUP flag should always be set on the first retry. */\r
+ setDup = true;\r
+ }\r
+ else if( pMqttConnection->awsIotMqttMode == true )\r
{\r
+ /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
+ * identifier) must be reset on every retry. */\r
+ setDup = true;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( setDup == true )\r
+ {\r
+ /* In AWS IoT MQTT mode, the references mutex must be locked to\r
+ * prevent the packet identifier from being read while it is being\r
+ * changed. */\r
+ if( pMqttConnection->awsIotMqttMode == true )\r
+ {\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Always set the DUP flag on the first retry. */\r
publishSetDup( pOperation->u.operation.pMqttPacket,\r
pOperation->u.operation.pPacketIdentifierHigh,\r
&( pOperation->u.operation.packetIdentifier ) );\r
+\r
+ if( pMqttConnection->awsIotMqttMode == true )\r
+ {\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
}\r
else\r
{\r
\r
/* This function should never be called with retry count greater than\r
* retry limit. */\r
- IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );\r
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=\r
+ pOperation->u.operation.periodic.retry.limit );\r
\r
/* Increment the retry count. */\r
- ( pOperation->u.operation.retry.count )++;\r
+ ( pOperation->u.operation.periodic.retry.count )++;\r
\r
/* Check for a response shortly for the final retry. Otherwise, calculate the\r
* next retry period. */\r
- if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+ if( pOperation->u.operation.periodic.retry.count >\r
+ pOperation->u.operation.periodic.retry.limit )\r
{\r
scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;\r
\r
}\r
else\r
{\r
- scheduleDelay = pOperation->u.operation.retry.nextPeriod;\r
+ scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;\r
\r
/* Double the retry period, subject to a ceiling value. */\r
- pOperation->u.operation.retry.nextPeriod *= 2;\r
+ pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;\r
\r
- if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )\r
+ if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )\r
{\r
- pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;\r
+ pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;\r
}\r
else\r
{\r
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",\r
pMqttConnection,\r
pOperation,\r
- ( unsigned long ) pOperation->u.operation.retry.count,\r
- ( unsigned long ) pOperation->u.operation.retry.limit,\r
+ ( unsigned long ) pOperation->u.operation.periodic.retry.count,\r
+ ( unsigned long ) pOperation->u.operation.periodic.retry.limit,\r
( unsigned long ) scheduleDelay );\r
\r
/* Check if this is the first retry. */\r
- firstRetry = ( pOperation->u.operation.retry.count == 1 );\r
+ firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );\r
\r
/* On the first retry, the PUBLISH will be moved from the pending processing\r
* list to the pending responses list. Lock the connection references mutex\r
{\r
IotLogError( "Callback should not be set for a waitable operation." );\r
\r
- return IOT_MQTT_BAD_PARAMETER;\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
}\r
else\r
{\r
pMqttConnection,\r
IotMqtt_OperationType( pOperation->u.operation.type ),\r
pOperation,\r
- pOperation->u.operation.jobReference + 1,\r
- pOperation->u.operation.jobReference );\r
+ ( long ) ( pOperation->u.operation.jobReference + 1 ),\r
+ ( long ) ( pOperation->u.operation.jobReference ) );\r
\r
/* The job reference count must be 0 or 1 after the decrement. */\r
IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||\r
void * pContext )\r
{\r
bool status = true;\r
+ uint32_t swapStatus = 0;\r
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
size_t bytesSent = 0;\r
\r
+ /* Swap status is not checked when asserts are disabled. */\r
+ ( void ) swapStatus;\r
+\r
/* Retrieve the MQTT connection from the context. */\r
_mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;\r
+ _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );\r
\r
/* Check parameters. */\r
/* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
- IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );\r
+ IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );\r
\r
/* Check that keep-alive interval is valid. The MQTT spec states its maximum\r
* value is 65,535 seconds. */\r
- IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );\r
+ IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );\r
\r
/* Only two values are valid for the next keep alive job delay. */\r
- IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||\r
- ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
+ IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==\r
+ pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||\r
+ ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs\r
+ == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
\r
IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );\r
\r
&pKeepAliveJob );\r
IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
\r
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
-\r
/* Determine whether to send a PINGREQ or check for PINGRESP. */\r
- if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )\r
+ if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==\r
+ pPingreqOperation->u.operation.periodic.ping.keepAliveMs )\r
{\r
IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );\r
\r
* more important than other operations. Bypass the queue of jobs for\r
* operations by directly sending the PINGREQ in this job. */\r
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
- pMqttConnection->pPingreqPacket,\r
- pMqttConnection->pingreqPacketSize );\r
+ pPingreqOperation->u.operation.pMqttPacket,\r
+ pPingreqOperation->u.operation.packetSize );\r
\r
- if( bytesSent != pMqttConnection->pingreqPacketSize )\r
+ if( bytesSent != pPingreqOperation->u.operation.packetSize )\r
{\r
IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );\r
status = false;\r
{\r
/* Assume the keep-alive will fail. The network receive callback will\r
* clear the failure flag upon receiving a PINGRESP. */\r
- pMqttConnection->keepAliveFailure = true;\r
+ swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),\r
+ 1,\r
+ 0 );\r
+ IotMqtt_Assert( swapStatus == 1 );\r
\r
/* Schedule a check for PINGRESP. */\r
- pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
+ pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
\r
IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",\r
pMqttConnection,\r
{\r
IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );\r
\r
- if( pMqttConnection->keepAliveFailure == false )\r
+ if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )\r
{\r
IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );\r
\r
/* PINGRESP was received. Schedule the next PINGREQ transmission. */\r
- pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+ pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =\r
+ pPingreqOperation->u.operation.periodic.ping.keepAliveMs;\r
}\r
else\r
{\r
{\r
taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,\r
pKeepAliveJob,\r
- pMqttConnection->nextKeepAliveMs );\r
+ pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );\r
\r
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
{\r
- IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",\r
+ IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",\r
pMqttConnection,\r
- IOT_MQTT_RESPONSE_WAIT_MS );\r
+ ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );\r
}\r
else\r
{\r
{\r
EMPTY_ELSE_MARKER;\r
}\r
-\r
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
}\r
\r
/*-----------------------------------------------------------*/\r
}\r
else\r
{\r
+ /* This operation may have already been removed by cleanup of pending\r
+ * operations (called from Disconnect). In that case, do nothing here. */\r
EMPTY_ELSE_MARKER;\r
}\r
\r
_IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,\r
&callbackParam );\r
\r
- /* Free any buffers associated with the current PUBLISH message. */\r
- if( pOperation->u.publish.pReceivedData != NULL )\r
- {\r
- IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
- }\r
- else\r
- {\r
- EMPTY_ELSE_MARKER;\r
- }\r
+ /* Free buffers associated with the current PUBLISH message. */\r
+ IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );\r
+ IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
\r
/* Free the incoming PUBLISH operation. */\r
IotMqtt_FreeOperation( pOperation );\r
waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
\r
/* Check PUBLISH retry counts and limits. */\r
- if( pOperation->u.operation.retry.limit > 0 )\r
+ if( pOperation->u.operation.periodic.retry.limit > 0 )\r
{\r
if( _checkRetryLimit( pOperation ) == false )\r
{\r
if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
{\r
/* Check if this operation should be scheduled for retransmission. */\r
- if( pOperation->u.operation.retry.limit > 0 )\r
+ if( pOperation->u.operation.periodic.retry.limit > 0 )\r
{\r
if( _scheduleNextRetry( pOperation ) == false )\r
{\r
\r
/* Check if the matched operation is a PUBLISH with retry. If it is, cancel\r
* the retry job. */\r
- if( pResult->u.operation.retry.limit > 0 )\r
+ if( pResult->u.operation.periodic.retry.limit > 0 )\r
{\r
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
pResult->job,\r
\r
if( pResult != NULL )\r
{\r
- IotLogDebug( "(MQTT connection %p) Found operation %s." ,\r
+ IotLogDebug( "(MQTT connection %p) Found operation %s.",\r
pMqttConnection,\r
IotMqtt_OperationType( type ) );\r
\r
* a packet identifer, but QoS 0 PUBLISH packets do not. */\r
if( pOutput->qos == IOT_MQTT_QOS_0 )\r
{\r
- pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );\r
+ pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );\r
pOutput->pPayload = pPacketIdentifierHigh;\r
}\r
else\r
{\r
- pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );\r
+ pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );\r
pOutput->pPayload = pPacketIdentifierHigh + sizeof( uint16_t );\r
}\r
\r
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
const char * pTopicFilter,\r
uint16_t topicFilterLength,\r
- IotMqttSubscription_t * pCurrentSubscription )\r
+ IotMqttSubscription_t * const pCurrentSubscription )\r
{\r
bool status = false;\r
_mqttSubscription_t * pSubscription = NULL;\r
\r
topicMatchParams.pTopicName = pTopicFilter;\r
topicMatchParams.topicNameLength = topicFilterLength;\r
- topicMatchParams.exactMatchOnly = false;\r
+ topicMatchParams.exactMatchOnly = true;\r
\r
/* Prevent any other thread from modifying the subscription list while this\r
* function is running. */\r
/*---------------------- MQTT internal data structures ----------------------*/\r
\r
/**\r
- * @brief Represents an MQTT connection.\r
- */\r
-typedef struct _mqttConnection\r
-{\r
- bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */\r
- bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */\r
- void * pNetworkConnection; /**< @brief References the transport-layer network connection. */\r
- const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */\r
- IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */\r
-\r
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
- const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */\r
- #endif\r
-\r
- bool disconnected; /**< @brief Tracks if this connection has been disconnected. */\r
- IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */\r
- int32_t references; /**< @brief Counts callbacks and operations using this connection. */\r
- IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */\r
- IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */\r
-\r
- IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */\r
- IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */\r
-\r
- bool keepAliveFailure; /**< @brief Failure flag for keep-alive operation. */\r
- uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */\r
- uint32_t nextKeepAliveMs; /**< @brief Relative delay for next keep-alive job. */\r
- IotTaskPoolJobStorage_t keepAliveJobStorage; /**< @brief Task pool job for processing this connection's keep-alive. */\r
- IotTaskPoolJob_t keepAliveJob; /**< @brief Task pool job for processing this connection's keep-alive. */\r
- uint8_t * pPingreqPacket; /**< @brief An MQTT PINGREQ packet, allocated if keep-alive is active. */\r
- size_t pingreqPacketSize; /**< @brief The size of an allocated PINGREQ packet. */\r
-} _mqttConnection_t;\r
-\r
-/**\r
- * @brief Represents a subscription stored in an MQTT connection.\r
+ * @cond DOXYGEN_IGNORE\r
+ * Doxygen should ignore this section.\r
+ *\r
+ * Forward declaration of MQTT connection type.\r
*/\r
-typedef struct _mqttSubscription\r
-{\r
- IotLink_t link; /**< @brief List link member. */\r
-\r
- int32_t references; /**< @brief How many subscription callbacks are using this subscription. */\r
-\r
- /**\r
- * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for\r
- * this subscription.\r
- *\r
- * If there are active subscription callbacks, @ref mqtt_function_unsubscribe\r
- * cannot remove this subscription. Instead, it will set this flag, which\r
- * schedules the removal of this subscription once all subscription callbacks\r
- * terminate.\r
- */\r
- bool unsubscribed;\r
-\r
- struct\r
- {\r
- uint16_t identifier; /**< @brief Packet identifier. */\r
- size_t order; /**< @brief Order in the packet's list of subscriptions. */\r
- } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */\r
-\r
- IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */\r
-\r
- uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */\r
- char pTopicFilter[]; /**< @brief The subscription topic filter. */\r
-} _mqttSubscription_t;\r
+struct _mqttConnection;\r
+/** @endcond */\r
\r
/**\r
* @brief Internal structure representing a single MQTT operation, such as\r
typedef struct _mqttOperation\r
{\r
/* Pointers to neighboring queue elements. */\r
- IotLink_t link; /**< @brief List link member. */\r
+ IotLink_t link; /**< @brief List link member. */\r
\r
- bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */\r
- _mqttConnection_t * pMqttConnection; /**< @brief MQTT connection associated with this operation. */\r
+ bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */\r
+ struct _mqttConnection * pMqttConnection; /**< @brief MQTT connection associated with this operation. */\r
\r
- IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */\r
- IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */\r
+ IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */\r
+ IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */\r
\r
union\r
{\r
} notify; /**< @brief How to notify of this operation's completion. */\r
IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */\r
\r
- struct\r
+ union\r
{\r
- uint32_t count;\r
- uint32_t limit;\r
- uint32_t nextPeriod;\r
- } retry;\r
+ struct\r
+ {\r
+ uint32_t count; /**< @brief Current number of retries. */\r
+ uint32_t limit; /**< @brief Maximum number of retries allowed. */\r
+ uint32_t nextPeriodMs; /**< @brief Next retry period. */\r
+ } retry; /**< @brief Additional information for PUBLISH retry. */\r
+\r
+ struct\r
+ {\r
+ uint32_t failure; /**< @brief Flag tracking keep-alive status. */\r
+ uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */\r
+ uint32_t nextPeriodMs; /**< @brief Relative delay for next keep-alive job. */\r
+ } ping; /**< @brief Additional information for keep-alive pings. */\r
+ } periodic; /**< @brief Additional information for periodic operations. */\r
} operation;\r
\r
/* If incomingPublish is true, this struct is valid. */\r
} u; /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */\r
} _mqttOperation_t;\r
\r
+/**\r
+ * @brief Represents an MQTT connection.\r
+ */\r
+typedef struct _mqttConnection\r
+{\r
+ bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */\r
+ bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */\r
+ void * pNetworkConnection; /**< @brief References the transport-layer network connection. */\r
+ const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */\r
+ IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */\r
+\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */\r
+ #endif\r
+\r
+ bool disconnected; /**< @brief Tracks if this connection has been disconnected. */\r
+ IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */\r
+ int32_t references; /**< @brief Counts callbacks and operations using this connection. */\r
+ IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */\r
+ IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */\r
+\r
+ IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */\r
+ IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */\r
+\r
+ _mqttOperation_t pingreq; /**< @brief Operation used for MQTT keep-alive. */\r
+} _mqttConnection_t;\r
+\r
+/**\r
+ * @brief Represents a subscription stored in an MQTT connection.\r
+ */\r
+typedef struct _mqttSubscription\r
+{\r
+ IotLink_t link; /**< @brief List link member. */\r
+\r
+ int32_t references; /**< @brief How many subscription callbacks are using this subscription. */\r
+\r
+ /**\r
+ * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for\r
+ * this subscription.\r
+ *\r
+ * If there are active subscription callbacks, @ref mqtt_function_unsubscribe\r
+ * cannot remove this subscription. Instead, it will set this flag, which\r
+ * schedules the removal of this subscription once all subscription callbacks\r
+ * terminate.\r
+ */\r
+ bool unsubscribed;\r
+\r
+ struct\r
+ {\r
+ uint16_t identifier; /**< @brief Packet identifier. */\r
+ size_t order; /**< @brief Order in the packet's list of subscriptions. */\r
+ } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */\r
+\r
+ IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */\r
+\r
+ uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */\r
+ char pTopicFilter[]; /**< @brief The subscription topic filter. */\r
+} _mqttSubscription_t;\r
+\r
/**\r
* @brief Represents an MQTT packet received from the network.\r
*\r