]> git.sur5r.net Git - freertos/commitdiff
Files as per 190725_FreeRTOS_IoT_Libs_Task_Pool_and_MQTT_Preview interim release.
authorrtel <rtel@1d2547de-c912-0410-9cb9-b8ca96c0e9e2>
Thu, 25 Jul 2019 20:20:24 +0000 (20:20 +0000)
committerrtel <rtel@1d2547de-c912-0410-9cb9-b8ca96c0e9e2>
Thu, 25 Jul 2019 20:20:24 +0000 (20:20 +0000)
git-svn-id: https://svn.code.sf.net/p/freertos/code/trunk@2708 1d2547de-c912-0410-9cb9-b8ca96c0e9e2

17 files changed:
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url [new file with mode: 0644]
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url [new file with mode: 0644]
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters
FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h [new file with mode: 0644]
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h

diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url
new file mode 100644 (file)
index 0000000..8a5a861
--- /dev/null
@@ -0,0 +1,5 @@
+[{000214A0-0000-0000-C000-000000000046}]\r
+Prop3=19,11\r
+[InternetShortcut]\r
+IDList=\r
+URL=https://www.freertos.org/mqtt/\r
index dda52fb5b53ba60cd5d8c80e329d0b2a3e00ddc5..b922783189fe0fac5d070548c0fb9e89ed5e04ba 100644 (file)
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_taskpool_internal.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\types\iot_taskpool_types.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h" />\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h" />\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h" />\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h" />\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOSIPConfigDefaults.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOS_ARP.h" />\r
index 4edf58197fefe0389223c7b1e76a1eb4611ce006..98cdff0ad5fc8352ce157934707b7352f6e0cfc8 100644 (file)
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h">\r
       <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
     </ClInclude>\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h">\r
-      <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
-    </ClInclude>\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h">\r
-      <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
-    </ClInclude>\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h">\r
-      <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
-    </ClInclude>\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h">\r
-      <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>\r
-    </ClInclude>\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h">\r
       <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include\types</Filter>\r
     </ClInclude>\r
index 213defd0358801c364880221f201d9ad938ecbb7..bedfd3e859a2f1405da42c34f19822f3c39984c7 100644 (file)
@@ -25,9 +25,9 @@
  * 1 tab == 4 spaces!\r
  */\r
 \r
-/*\r
- * TBD\r
- */\r
+ /***\r
+  * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.\r
 ***/\r
 \r
 /* Standard includes. */\r
 #include <stdio.h>\r
@@ -102,10 +102,9 @@ static UBaseType_t ulNextRand;
 \r
 int main( void )\r
 {\r
-       /*\r
-        * Instructions for using this project are provided on:\r
-        * TBD\r
-        */\r
+       /***\r
+        * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.\r
+        ***/\r
 \r
        /* Miscellaneous initialisation including preparing the logging and seeding\r
        the random number generator. */\r
index f1c7a42a4a96fcf3d15291f30c9ad105a92a02d4..fb2eaca261862777d12e8aeeda16f7a30ee3d407 100644 (file)
@@ -25,7 +25,6 @@
  * 1 tab == 4 spaces!\r
  */\r
 \r
-//_RB_ Add link to docs here.\r
 \r
 /* Kernel includes. */\r
 #include "FreeRTOS.h"\r
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url
new file mode 100644 (file)
index 0000000..c00147b
--- /dev/null
@@ -0,0 +1,6 @@
+[{000214A0-0000-0000-C000-000000000046}]\r
+Prop3=19,11\r
+[InternetShortcut]\r
+IDList=\r
+URL=https://www.freertos.org/task-pool/\r
+HotKey=0\r
index 4c7704c7089bb0a55b6deeae9209c9ad9894f8ad..7f9596ad36be20cbb2760155f2e057850e5a0722 100644 (file)
     <ClInclude Include="..\..\..\..\FreeRTOS\Source\include\task.h" />\r
     <ClInclude Include="..\..\..\..\FreeRTOS\Source\include\timers.h" />\r
     <ClInclude Include="..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW\portmacro.h" />\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h" />\r
+    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\iot_taskpool.h" />\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_error.h" />\r
index 4f426c81880cb810e9387c469e4c968454e4d664..3653a6cbe02ea732eeea0a06df629d5ba5cb5efa 100644 (file)
     </ClInclude>\r
     <ClInclude Include="iot_config.h" />\r
     <ClInclude Include="iot_config_common.h" />\r
-    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h">\r
-      <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform</Filter>\r
-    </ClInclude>\r
     <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h">\r
       <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\include\types</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h">\r
+      <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform\types</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
index 7f42f9ab2cb81d834745038cd6b93bffd79ae82d..2cf6b382910edb3b49e8cef10622a1aa99881158 100644 (file)
  * 1 tab == 4 spaces!\r
  */\r
 \r
+       /***\r
+        * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.\r
+        ***/\r
+\r
+\r
 /* Standard includes. */\r
 #include <stdio.h>\r
 #include <time.h>\r
@@ -55,10 +60,9 @@ const uint8_t ucMACAddress[ 6 ] = { configMAC_ADDR0, configMAC_ADDR1, configMAC_
 \r
 int main( void )\r
 {\r
-       /*\r
-        * Instructions for using this project are provided on:\r
-        * TBD\r
-        */\r
+       /***\r
+        * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.\r
+        ***/\r
 \r
        /* Create the example that demonstrates task pool functionality.  Examples\r
        that demonstrate networking connectivity will be added in future projects\r
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h
new file mode 100644 (file)
index 0000000..d44593d
--- /dev/null
@@ -0,0 +1,547 @@
+/*\r
+ * FreeRTOS Kernel V10.2.0\r
+ * Copyright (C) 2019 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://www.FreeRTOS.org\r
+ * http://aws.amazon.com/freertos\r
+ *\r
+ * 1 tab == 4 spaces!\r
+ */\r
+\r
+/**\r
+ * @file atomic.h\r
+ * @brief FreeRTOS atomic operation support.\r
+ *\r
+ * Two implementations of atomic are given in this header file:\r
+ * 1. Disabling interrupt globally.\r
+ * 2. ISA native atomic support.\r
+ * The former is available to all ports (compiler-architecture combination),\r
+ * while the latter is only available to ports compiling with GCC (version at\r
+ * least 4.7.0), which also have ISA atomic support.\r
+ *\r
+ * User can select which implementation to use by:\r
+ * setting/clearing configUSE_ATOMIC_INSTRUCTION in FreeRTOSConfig.h.\r
+ * Define AND set configUSE_ATOMIC_INSTRUCTION to 1 for ISA native atomic support.\r
+ * Undefine OR clear configUSE_ATOMIC_INSTRUCTION for disabling global interrupt\r
+ * implementation.\r
+ *\r
+ * @see GCC Built-in Functions for Memory Model Aware Atomic Operations\r
+ *      https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html\r
+ */\r
+\r
+#ifndef ATOMIC_H\r
+#define ATOMIC_H\r
+\r
+#ifndef INC_FREERTOS_H\r
+    #error "include FreeRTOS.h must appear in source files before include atomic.h"\r
+#endif\r
+\r
+/* Standard includes. */\r
+#include <stdint.h>\r
+\r
+#ifdef __cplusplus\r
+extern "C" {\r
+#endif\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    /* Needed for __atomic_compare_exchange() weak=false. */\r
+    #include <stdbool.h>\r
+\r
+    /* This branch is for GCC compiler and GCC compiler only. */\r
+    #ifndef portFORCE_INLINE\r
+        #define portFORCE_INLINE  inline __attribute__((always_inline))\r
+    #endif\r
+\r
+#else\r
+\r
+    /* Port specific definitions -- entering/exiting critical section.\r
+     * Refer template -- ./lib/FreeRTOS/portable/Compiler/Arch/portmacro.h\r
+     *\r
+     * Every call to ATOMIC_EXIT_CRITICAL() must be closely paired with\r
+     * ATOMIC_ENTER_CRITICAL().\r
+     */\r
+    #if defined( portSET_INTERRUPT_MASK_FROM_ISR )\r
+\r
+        /* Nested interrupt scheme is supported in this port. */\r
+        #define ATOMIC_ENTER_CRITICAL()     \\r
+            UBaseType_t uxCriticalSectionType = portSET_INTERRUPT_MASK_FROM_ISR()\r
+\r
+        #define ATOMIC_EXIT_CRITICAL()      \\r
+            portCLEAR_INTERRUPT_MASK_FROM_ISR( uxCriticalSectionType )\r
+\r
+    #else\r
+\r
+        /* Nested interrupt scheme is NOT supported in this port. */\r
+        #define ATOMIC_ENTER_CRITICAL()     portENTER_CRITICAL()\r
+        #define ATOMIC_EXIT_CRITICAL()      portEXIT_CRITICAL()\r
+\r
+    #endif /* portSET_INTERRUPT_MASK_FROM_ISR() */\r
+\r
+    /* Port specific definition -- "always inline". \r
+     * Inline is compiler specific, and may not always get inlined depending on your optimization level. \r
+     * Also, inline is considerred as performance optimization for atomic. \r
+     * Thus, if portFORCE_INLINE is not provided by portmacro.h, instead of resulting error,\r
+     * simply define it. \r
+     */\r
+    #ifndef portFORCE_INLINE\r
+        #define portFORCE_INLINE \r
+    #endif\r
+\r
+#endif /* configUSE_GCC_BUILTIN_ATOMICS */\r
+\r
+#define ATOMIC_COMPARE_AND_SWAP_SUCCESS     0x1U        /**< Compare and swap succeeded, swapped. */\r
+#define ATOMIC_COMPARE_AND_SWAP_FAILURE     0x0U        /**< Compare and swap failed, did not swap. */\r
+\r
+/*----------------------------- Swap && CAS ------------------------------*/\r
+\r
+/**\r
+ * Atomic compare-and-swap\r
+ *\r
+ * @brief Performs an atomic compare-and-swap operation on the specified values.\r
+ *\r
+ * @param[in, out] pDestination  Pointer to memory location from where value is\r
+ *                               to be loaded and checked.\r
+ * @param[in] ulExchange         If condition meets, write this value to memory.\r
+ * @param[in] ulComparand        Swap condition.\r
+ *\r
+ * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.\r
+ *\r
+ * @note This function only swaps *pDestination with ulExchange, if previous\r
+ *       *pDestination value equals ulComparand.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_CompareAndSwap_u32(\r
+        uint32_t volatile * pDestination,\r
+        uint32_t ulExchange,\r
+        uint32_t ulComparand )\r
+{\r
+\r
+    uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    if ( __atomic_compare_exchange( pDestination,\r
+                                    &ulComparand,\r
+                                    &ulExchange,\r
+                                    false,\r
+                                    __ATOMIC_SEQ_CST,\r
+                                    __ATOMIC_SEQ_CST ) )\r
+    {\r
+        ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+    }\r
+\r
+#else\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    if ( *pDestination == ulComparand )\r
+    {\r
+        *pDestination = ulExchange;\r
+        ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+    }\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+#endif\r
+\r
+    return ulReturnValue;\r
+\r
+}\r
+\r
+/**\r
+ * Atomic swap (pointers)\r
+ *\r
+ * @brief Atomically sets the address pointed to by *ppDestination to the value\r
+ *        of *pExchange.\r
+ *\r
+ * @param[in, out] ppDestination  Pointer to memory location from where a pointer\r
+ *                                value is to be loaded and written back to.\r
+ * @param[in] pExchange           Pointer value to be written to *ppDestination.\r
+ *\r
+ * @return The initial value of *ppDestination.\r
+ */\r
+static portFORCE_INLINE void * Atomic_SwapPointers_p32(\r
+        void * volatile * ppDestination,\r
+        void * pExchange )\r
+{\r
+    void * pReturnValue;\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    __atomic_exchange( ppDestination, &pExchange, &pReturnValue, __ATOMIC_SEQ_CST );\r
+\r
+#else\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    pReturnValue = *ppDestination;\r
+\r
+    *ppDestination = pExchange;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+#endif\r
+\r
+    return pReturnValue;\r
+}\r
+\r
+/**\r
+ * Atomic compare-and-swap (pointers)\r
+ *\r
+ * @brief Performs an atomic compare-and-swap operation on the specified pointer\r
+ *        values.\r
+ *\r
+ * @param[in, out] ppDestination  Pointer to memory location from where a pointer\r
+ *                                value is to be loaded and checked.\r
+ * @param[in] pExchange           If condition meets, write this value to memory.\r
+ * @param[in] pComparand          Swap condition.\r
+ *\r
+ * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.\r
+ *\r
+ * @note This function only swaps *ppDestination with pExchange, if previous\r
+ *       *ppDestination value equals pComparand.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_CompareAndSwapPointers_p32(\r
+        void * volatile * ppDestination,\r
+        void * pExchange, void * pComparand )\r
+{\r
+    uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;\r
+\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+    if ( __atomic_compare_exchange( ppDestination,\r
+                                    &pComparand,\r
+                                    &pExchange,\r
+                                    false,\r
+                                    __ATOMIC_SEQ_CST,\r
+                                    __ATOMIC_SEQ_CST ) )\r
+    {\r
+        ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+    }\r
+\r
+#else\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    if ( *ppDestination == pComparand )\r
+    {\r
+        *ppDestination = pExchange;\r
+        ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;\r
+    }\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+#endif\r
+\r
+    return ulReturnValue;\r
+}\r
+\r
+\r
+/*----------------------------- Arithmetic ------------------------------*/\r
+\r
+/**\r
+ * Atomic add\r
+ *\r
+ * @brief Atomically adds count to the value of the specified pointer points to.\r
+ *\r
+ * @param[in,out] pAddend  Pointer to memory location from where value is to be\r
+ *                         loaded and written back to.\r
+ * @param[in] ulCount      Value to be added to *pAddend.\r
+ *\r
+ * @return previous *pAddend value.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Add_u32(\r
+        uint32_t volatile * pAddend,\r
+        uint32_t ulCount )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_add(pAddend, ulCount, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pAddend;\r
+\r
+    *pAddend += ulCount;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic subtract\r
+ *\r
+ * @brief Atomically subtracts count from the value of the specified pointer\r
+ *        pointers to.\r
+ *\r
+ * @param[in,out] pAddend  Pointer to memory location from where value is to be\r
+ *                         loaded and written back to.\r
+ * @param[in] ulCount      Value to be subtract from *pAddend.\r
+ *\r
+ * @return previous *pAddend value.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Subtract_u32(\r
+        uint32_t volatile * pAddend,\r
+        uint32_t ulCount )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_sub(pAddend, ulCount, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pAddend;\r
+\r
+    *pAddend -= ulCount;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic increment\r
+ *\r
+ * @brief Atomically increments the value of the specified pointer points to.\r
+ *\r
+ * @param[in,out] pAddend  Pointer to memory location from where value is to be\r
+ *                         loaded and written back to.\r
+ *\r
+ * @return *pAddend value before increment.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Increment_u32( uint32_t volatile * pAddend )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_add(pAddend, 1, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pAddend;\r
+\r
+    *pAddend += 1;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic decrement\r
+ *\r
+ * @brief Atomically decrements the value of the specified pointer points to\r
+ *\r
+ * @param[in,out] pAddend  Pointer to memory location from where value is to be\r
+ *                         loaded and written back to.\r
+ *\r
+ * @return *pAddend value before decrement.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_Decrement_u32( uint32_t volatile * pAddend )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_sub(pAddend, 1, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pAddend;\r
+\r
+    *pAddend -= 1;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/*----------------------------- Bitwise Logical ------------------------------*/\r
+\r
+/**\r
+ * Atomic OR\r
+ *\r
+ * @brief Performs an atomic OR operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination  Pointer to memory location from where value is\r
+ *                                to be loaded and written back to.\r
+ * @param [in] ulValue            Value to be ORed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_OR_u32(\r
+        uint32_t volatile * pDestination,\r
+        uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_or(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pDestination;\r
+\r
+    *pDestination |= ulValue;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic AND\r
+ *\r
+ * @brief Performs an atomic AND operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination  Pointer to memory location from where value is\r
+ *                                to be loaded and written back to.\r
+ * @param [in] ulValue            Value to be ANDed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_AND_u32(\r
+        uint32_t volatile * pDestination,\r
+        uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_and(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pDestination;\r
+\r
+    *pDestination &= ulValue;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic NAND\r
+ *\r
+ * @brief Performs an atomic NAND operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination  Pointer to memory location from where value is\r
+ *                                to be loaded and written back to.\r
+ * @param [in] ulValue            Value to be NANDed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_NAND_u32(\r
+        uint32_t volatile * pDestination,\r
+        uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_nand(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pDestination;\r
+\r
+    *pDestination = ~(ulCurrent & ulValue);\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+/**\r
+ * Atomic XOR\r
+ *\r
+ * @brief Performs an atomic XOR operation on the specified values.\r
+ *\r
+ * @param [in, out] pDestination  Pointer to memory location from where value is\r
+ *                                to be loaded and written back to.\r
+ * @param [in] ulValue            Value to be XORed with *pDestination.\r
+ *\r
+ * @return The original value of *pDestination.\r
+ */\r
+static portFORCE_INLINE uint32_t Atomic_XOR_u32(\r
+        uint32_t volatile * pDestination,\r
+        uint32_t ulValue )\r
+{\r
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )\r
+\r
+    return __atomic_fetch_xor(pDestination, ulValue, __ATOMIC_SEQ_CST);\r
+\r
+#else\r
+\r
+    uint32_t ulCurrent;\r
+\r
+    ATOMIC_ENTER_CRITICAL();\r
+\r
+    ulCurrent = *pDestination;\r
+\r
+    *pDestination ^= ulValue;\r
+\r
+    ATOMIC_EXIT_CRITICAL();\r
+\r
+    return ulCurrent;\r
+\r
+#endif\r
+}\r
+\r
+#ifdef __cplusplus\r
+}\r
+#endif\r
+\r
+#endif /* ATOMIC_H */\r
index 327784a3e6b1b21eec65567345ceea714cd5818a..dad106a68ab2d40d37309f1f9075b42a0edcbf74 100644 (file)
@@ -427,7 +427,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
                                   size_t subscriptionCount,\r
                                   uint32_t flags,\r
                                   const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                  IotMqttOperation_t * pSubscribeOperation );\r
+                                  IotMqttOperation_t * const pSubscribeOperation );\r
 /* @[declare_mqtt_subscribe] */\r
 \r
 /**\r
@@ -512,7 +512,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
                                     size_t subscriptionCount,\r
                                     uint32_t flags,\r
                                     const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                    IotMqttOperation_t * pUnsubscribeOperation );\r
+                                    IotMqttOperation_t * const pUnsubscribeOperation );\r
 /* @[declare_mqtt_unsubscribe] */\r
 \r
 /**\r
@@ -642,7 +642,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
                                 const IotMqttPublishInfo_t * pPublishInfo,\r
                                 uint32_t flags,\r
                                 const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                IotMqttOperation_t * pPublishOperation );\r
+                                IotMqttOperation_t * const pPublishOperation );\r
 /* @[declare_mqtt_publish] */\r
 \r
 /**\r
@@ -817,7 +817,7 @@ const char * IotMqtt_OperationType( IotMqttOperationType_t operation );
 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
                            const char * pTopicFilter,\r
                            uint16_t topicFilterLength,\r
-                           IotMqttSubscription_t * pCurrentSubscription );\r
+                           IotMqttSubscription_t * const pCurrentSubscription );\r
 /* @[declare_mqtt_issubscribed] */\r
 \r
 #endif /* ifndef IOT_MQTT_H_ */\r
index 23133f46b78776713c4a39d5f1088ae7e5038fc0..0d6a259ed651c5dcfd3facf0b632990a0109a462 100644 (file)
@@ -94,7 +94,7 @@ static void _mqttSubscription_tryDestroy( void * pData );
 static void _mqttOperation_tryDestroy( void * pData );\r
 \r
 /**\r
- * @brief Create a keep-alive job for an MQTT connection.\r
+ * @brief Initialize the keep-alive operation for an MQTT connection.\r
  *\r
  * @param[in] pNetworkInfo User-provided network information for the new\r
  * connection.\r
@@ -103,9 +103,9 @@ static void _mqttOperation_tryDestroy( void * pData );
  *\r
  * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
  */\r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
-                                 uint16_t keepAliveSeconds,\r
-                                 _mqttConnection_t * pMqttConnection );\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                       uint16_t keepAliveSeconds,\r
+                                       _mqttConnection_t * pMqttConnection );\r
 \r
 /**\r
  * @brief Creates a new MQTT connection and initializes its members.\r
@@ -141,7 +141,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
                                            size_t subscriptionCount,\r
                                            uint32_t flags,\r
                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                           IotMqttOperation_t * pOperationReference );\r
+                                           IotMqttOperation_t * const pOperationReference );\r
 \r
 /*-----------------------------------------------------------*/\r
 \r
@@ -238,9 +238,9 @@ static void _mqttOperation_tryDestroy( void * pData )
 \r
 /*-----------------------------------------------------------*/\r
 \r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
-                                 uint16_t keepAliveSeconds,\r
-                                 _mqttConnection_t * pMqttConnection )\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                       uint16_t keepAliveSeconds,\r
+                                       _mqttConnection_t * pMqttConnection )\r
 {\r
     bool status = true;\r
     IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
@@ -253,9 +253,12 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
     IotMqttError_t ( * serializePingreq )( uint8_t **,\r
                                            size_t * ) = _IotMqtt_SerializePingreq;\r
 \r
+    /* Set PINGREQ operation members. */\r
+    pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;\r
+\r
     /* Convert the keep-alive interval to milliseconds. */\r
-    pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
-    pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+    pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;\r
+    pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;\r
 \r
     /* Choose a PINGREQ serializer function. */\r
     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
@@ -277,8 +280,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
 \r
     /* Generate a PINGREQ packet. */\r
-    serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
-                                        &( pMqttConnection->pingreqPacketSize ) );\r
+    serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),\r
+                                        &( pMqttConnection->pingreq.u.operation.packetSize ) );\r
 \r
     if( serializeStatus != IOT_MQTT_SUCCESS )\r
     {\r
@@ -291,8 +294,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
         /* Create the task pool job that processes keep-alive. */\r
         jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
                                            pMqttConnection,\r
-                                           &( pMqttConnection->keepAliveJobStorage ),\r
-                                           &( pMqttConnection->keepAliveJob ) );\r
+                                           &( pMqttConnection->pingreq.jobStorage ),\r
+                                           &( pMqttConnection->pingreq.job ) );\r
 \r
         /* Task pool job creation for a pre-allocated job should never fail.\r
          * Abort the program if it does. */\r
@@ -408,9 +411,9 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
     /* Check if keep-alive is active for this connection. */\r
     if( keepAliveSeconds != 0 )\r
     {\r
-        if( _createKeepAliveJob( pNetworkInfo,\r
-                                 keepAliveSeconds,\r
-                                 pMqttConnection ) == false )\r
+        if( _createKeepAliveOperation( pNetworkInfo,\r
+                                       keepAliveSeconds,\r
+                                       pMqttConnection ) == false )\r
         {\r
             IOT_SET_AND_GOTO_CLEANUP( false );\r
         }\r
@@ -471,17 +474,31 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
 {\r
     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
 \r
+    /* Default free packet function. */\r
+    void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;\r
+\r
     /* Clean up keep-alive if still allocated. */\r
-    if( pMqttConnection->keepAliveMs != 0 )\r
+    if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
     {\r
         IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
 \r
-        _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+        /* Choose a function to free the packet. */\r
+        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+            if( pMqttConnection->pSerializer != NULL )\r
+            {\r
+                if( pMqttConnection->pSerializer->freePacket != NULL )\r
+                {\r
+                    freePacket = pMqttConnection->pSerializer->freePacket;\r
+                }\r
+            }\r
+        #endif\r
+\r
+        freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
 \r
         /* Clear data about the keep-alive. */\r
-        pMqttConnection->keepAliveMs = 0;\r
-        pMqttConnection->pPingreqPacket = NULL;\r
-        pMqttConnection->pingreqPacketSize = 0;\r
+        pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+        pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+        pMqttConnection->pingreq.u.operation.packetSize = 0;\r
 \r
         /* Decrement reference count. */\r
         pMqttConnection->references--;\r
@@ -494,9 +511,9 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
     /* A connection to be destroyed should have no keep-alive and at most 1\r
      * reference. */\r
     IotMqtt_Assert( pMqttConnection->references <= 1 );\r
-    IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
-    IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
-    IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
+    IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );\r
+    IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );\r
+    IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );\r
 \r
     /* Remove all subscriptions. */\r
     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
@@ -546,7 +563,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
                                            size_t subscriptionCount,\r
                                            uint32_t flags,\r
                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                           IotMqttOperation_t * pOperationReference )\r
+                                           IotMqttOperation_t * const pOperationReference )\r
 {\r
     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
     _mqttOperation_t * pSubscriptionOperation = NULL;\r
@@ -666,7 +683,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
 \r
     /* Check the subscription operation data and set the operation type. */\r
     IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
-    IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
+    IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );\r
     pSubscriptionOperation->u.operation.type = operation;\r
 \r
     /* Generate a subscription packet from the subscription list. */\r
@@ -853,7 +870,7 @@ IotMqttError_t IotMqtt_Init( void )
     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
 \r
     /* Log initialization status. */\r
-    if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0\r
+    if( status != IOT_MQTT_SUCCESS )\r
     {\r
         IotLogError( "Failed to initialize MQTT library serializer. " );\r
     }\r
@@ -896,7 +913,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     _mqttConnection_t * pNewMqttConnection = NULL;\r
 \r
     /* Default CONNECT serializer function. */\r
-    IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.\r
+    IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,\r
                                            uint8_t **,\r
                                            size_t * ) = _IotMqtt_SerializeConnect;\r
 \r
@@ -911,7 +928,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     }\r
 \r
     /* Validate network interface and connect info. */\r
-    if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().\r
+    if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )\r
     {\r
         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
     }\r
@@ -1002,7 +1019,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
 \r
     IotLogInfo( "Establishing new MQTT connection." );\r
 \r
-    /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name?  I don't think this does create a connection as that happens below.\r
+    /* Initialize a new MQTT connection object. */\r
     pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
                                                 pNetworkInfo,\r
                                                 pConnectInfo->keepAliveSeconds );\r
@@ -1059,7 +1076,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
     IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
                     == IOT_MQTT_FLAG_WAITABLE );\r
-    IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+    IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
 \r
     /* Set the operation type. */\r
     pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
@@ -1127,7 +1144,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
 \r
     /* Add the CONNECT operation to the send queue for network transmission. */\r
-    status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?\r
+    status = _IotMqtt_ScheduleOperation( pOperation,\r
                                          _IotMqtt_ProcessSend,\r
                                          0 );\r
 \r
@@ -1150,13 +1167,13 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     if( status == IOT_MQTT_SUCCESS )\r
     {\r
         /* Check if a keep-alive job should be scheduled. */\r
-        if( pNewMqttConnection->keepAliveMs != 0 )\r
+        if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
         {\r
             IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
 \r
             taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
-                                                           pNewMqttConnection->keepAliveJob,\r
-                                                           pNewMqttConnection->nextKeepAliveMs );\r
+                                                           pNewMqttConnection->pingreq.job,\r
+                                                           pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );\r
 \r
             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
             {\r
@@ -1268,7 +1285,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
                 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
                 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
                                 == IOT_MQTT_FLAG_WAITABLE );\r
-                IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+                IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
 \r
                 /* Set the operation type. */\r
                 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
@@ -1389,7 +1406,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
                                   size_t subscriptionCount,\r
                                   uint32_t flags,\r
                                   const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                  IotMqttOperation_t * pSubscribeOperation )\r
+                                  IotMqttOperation_t * const pSubscribeOperation )\r
 {\r
     return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
                                 mqttConnection,\r
@@ -1445,7 +1462,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
                                     size_t subscriptionCount,\r
                                     uint32_t flags,\r
                                     const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                    IotMqttOperation_t * pUnsubscribeOperation )\r
+                                    IotMqttOperation_t * const pUnsubscribeOperation )\r
 {\r
     return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
                                 mqttConnection,\r
@@ -1500,7 +1517,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
                                 const IotMqttPublishInfo_t * pPublishInfo,\r
                                 uint32_t flags,\r
                                 const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                IotMqttOperation_t * pPublishOperation )\r
+                                IotMqttOperation_t * const pPublishOperation )\r
 {\r
     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
     _mqttOperation_t * pOperation = NULL;\r
@@ -1651,8 +1668,8 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
         /* A QoS 0 PUBLISH may not be retried. */\r
         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
         {\r
-            pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
-            pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
+            pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;\r
+            pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;\r
         }\r
         else\r
         {\r
index 169a292df791479bd14cad9e48514ad49929348b..42843d28c5e2809c323816e0641037705f857728 100644 (file)
@@ -43,6 +43,9 @@
 /* Platform layer includes. */\r
 #include "platform/iot_threads.h"\r
 \r
+/* Atomics include. */\r
+#include "iot_atomic.h"\r
+\r
 /*-----------------------------------------------------------*/\r
 \r
 /**\r
@@ -89,6 +92,22 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
 static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
                          uint16_t packetIdentifier );\r
 \r
+/**\r
+ * @brief Flush a packet from the stream of incoming data.\r
+ *\r
+ * This function is called when memory for a packet cannot be allocated. The\r
+ * packet is flushed from the stream of incoming data so that the next packet\r
+ * may be read.\r
+ *\r
+ * @param[in] pNetworkConnection Network connection to use for receive, which\r
+ * may be different from the network connection associated with the MQTT connection.\r
+ * @param[in] pMqttConnection The associated MQTT connection.\r
+ * @param[in] length The length of the packet to flush.\r
+ */\r
+static void _flushPacket( void * pNetworkConnection,\r
+                          const _mqttConnection_t * pMqttConnection,\r
+                          size_t length );\r
+\r
 /*-----------------------------------------------------------*/\r
 \r
 static bool _incomingPacketValid( uint8_t packetType )\r
@@ -201,6 +220,14 @@ static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
 \r
         if( pIncomingPacket->pRemainingData == NULL )\r
         {\r
+            IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "\r
+                         "%lu for incoming packet type %lu.",\r
+                         pMqttConnection,\r
+                         ( unsigned long ) pIncomingPacket->remainingLength,\r
+                         ( unsigned long ) pIncomingPacket->type );\r
+\r
+            _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );\r
+\r
             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
         }\r
         else\r
@@ -593,22 +620,18 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
 \r
             if( status == IOT_MQTT_SUCCESS )\r
             {\r
-                IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
-\r
-                if( pMqttConnection->keepAliveFailure == false )\r
+                if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),\r
+                                               0,\r
+                                               1 ) == 1 )\r
                 {\r
-                    IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
-                                pMqttConnection );\r
+                    IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
+                                 pMqttConnection );\r
                 }\r
                 else\r
                 {\r
-                    IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
-                                 pMqttConnection );\r
-\r
-                    pMqttConnection->keepAliveFailure = false;\r
+                    IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
+                                pMqttConnection );\r
                 }\r
-\r
-                IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
             }\r
             else\r
             {\r
@@ -637,15 +660,13 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
 static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
                          uint16_t packetIdentifier )\r
 {\r
-    IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
-    uint8_t * pPuback = NULL;\r
-    size_t pubackSize = 0, bytesSent = 0;\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    _mqttOperation_t * pPubackOperation = NULL;\r
 \r
-    /* Default PUBACK serializer and free packet functions. */\r
+    /* Default PUBACK serializer function. */\r
     IotMqttError_t ( * serializePuback )( uint16_t,\r
                                           uint8_t **,\r
                                           size_t * ) = _IotMqtt_SerializePuback;\r
-    void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
 \r
     IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",\r
                  pMqttConnection,\r
@@ -669,57 +690,82 @@ static void _sendPuback( _mqttConnection_t * pMqttConnection,
             EMPTY_ELSE_MARKER;\r
         }\r
     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
-    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
-        if( pMqttConnection->pSerializer != NULL )\r
-        {\r
-            if( pMqttConnection->pSerializer->freePacket != NULL )\r
-            {\r
-                freePacket = pMqttConnection->pSerializer->freePacket;\r
-            }\r
-            else\r
-            {\r
-                EMPTY_ELSE_MARKER;\r
-            }\r
-        }\r
-        else\r
-        {\r
-            EMPTY_ELSE_MARKER;\r
-        }\r
-    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* Create a PUBACK operation. */\r
+    status = _IotMqtt_CreateOperation( pMqttConnection,\r
+                                       0,\r
+                                       NULL,\r
+                                       &pPubackOperation );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Set the operation type. */\r
+    pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;\r
 \r
     /* Generate a PUBACK packet from the packet identifier. */\r
-    serializeStatus = serializePuback( packetIdentifier,\r
-                                       &pPuback,\r
-                                       &pubackSize );\r
+    status = serializePuback( packetIdentifier,\r
+                              &( pPubackOperation->u.operation.pMqttPacket ),\r
+                              &( pPubackOperation->u.operation.packetSize ) );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Add the PUBACK operation to the send queue for network transmission. */\r
+    status = _IotMqtt_ScheduleOperation( pPubackOperation,\r
+                                         _IotMqtt_ProcessSend,\r
+                                         0 );\r
 \r
-    if( serializeStatus != IOT_MQTT_SUCCESS )\r
+    if( status != IOT_MQTT_SUCCESS )\r
     {\r
-        IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "\r
-                    "received PUBLISH %hu.",\r
-                    pMqttConnection,\r
-                    packetIdentifier );\r
+        IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",\r
+                     pMqttConnection );\r
+\r
+        IOT_GOTO_CLEANUP();\r
     }\r
     else\r
     {\r
-        bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
-                                                              pPuback,\r
-                                                              pubackSize );\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Clean up on error. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
 \r
-        if( bytesSent != pubackSize )\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        if( pPubackOperation != NULL )\r
         {\r
-            IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"\r
-                        " PUBLISH %hu.",\r
-                        pMqttConnection,\r
-                        packetIdentifier );\r
+            _IotMqtt_DestroyOperation( pPubackOperation );\r
         }\r
         else\r
         {\r
-            IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",\r
-                         pMqttConnection,\r
-                         packetIdentifier );\r
+            EMPTY_ELSE_MARKER;\r
         }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
 \r
-        freePacket( pPuback );\r
+static void _flushPacket( void * pNetworkConnection,\r
+                          const _mqttConnection_t * pMqttConnection,\r
+                          size_t length )\r
+{\r
+    size_t bytesFlushed = 0;\r
+    uint8_t receivedByte = 0;\r
+\r
+    for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )\r
+    {\r
+        ( void ) _IotMqtt_GetNextByte( pNetworkConnection,\r
+                                       pMqttConnection->pNetworkInterface,\r
+                                       &receivedByte );\r
     }\r
 }\r
 \r
@@ -761,17 +807,27 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
     IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;\r
     IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };\r
+    void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;\r
+\r
+    /* Disconnect callback function. */\r
+    void ( * disconnectCallback )( void *,\r
+                                   IotMqttCallbackParam_t * ) = NULL;\r
+\r
+    /* Network close function. */\r
+    IotNetworkError_t ( * closeConnection) ( void * ) = NULL;\r
+\r
+    /* Default free packet function. */\r
+    void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
 \r
     /* Mark the MQTT connection as disconnected and the keep-alive as failed. */\r
     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
     pMqttConnection->disconnected = true;\r
-    pMqttConnection->keepAliveFailure = true;\r
 \r
-    if( pMqttConnection->keepAliveMs != 0 )\r
+    if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
     {\r
         /* Keep-alive must have a PINGREQ allocated. */\r
-        IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );\r
-        IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );\r
+        IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );\r
+        IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );\r
 \r
         /* PINGREQ provides a reference to the connection, so reference count must\r
          * be nonzero. */\r
@@ -779,7 +835,7 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
 \r
         /* Attempt to cancel the keep-alive job. */\r
         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
-                                                pMqttConnection->keepAliveJob,\r
+                                                pMqttConnection->pingreq.job,\r
                                                 NULL );\r
 \r
         /* If the keep-alive job was not canceled, it must be already executing.\r
@@ -791,13 +847,23 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
          * the executing keep-alive job will clean up itself. */\r
         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
         {\r
-            /* Clean up PINGREQ packet and job. */\r
-            _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+            /* Choose a function to free the packet. */\r
+            #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+                if( pMqttConnection->pSerializer != NULL )\r
+                {\r
+                    if( pMqttConnection->pSerializer->freePacket != NULL )\r
+                    {\r
+                        freePacket = pMqttConnection->pSerializer->freePacket;\r
+                    }\r
+                }\r
+            #endif\r
+\r
+            freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
 \r
             /* Clear data about the keep-alive. */\r
-            pMqttConnection->keepAliveMs = 0;\r
-            pMqttConnection->pPingreqPacket = NULL;\r
-            pMqttConnection->pingreqPacketSize = 0;\r
+            pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+            pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+            pMqttConnection->pingreq.u.operation.packetSize = 0;\r
 \r
             /* Keep-alive is cleaned up; decrement reference count. Since this\r
              * function must be followed with a call to DISCONNECT, a check to\r
@@ -817,12 +883,20 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
         EMPTY_ELSE_MARKER;\r
     }\r
 \r
+    /* Copy the function pointers and contexts, as the MQTT connection may be\r
+     * modified after the mutex is released. */\r
+    disconnectCallback = pMqttConnection->disconnectCallback.function;\r
+    pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;\r
+\r
+    closeConnection = pMqttConnection->pNetworkInterface->close;\r
+    pNetworkConnection = pMqttConnection->pNetworkConnection;\r
+\r
     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
 \r
     /* Close the network connection. */\r
-    if( pMqttConnection->pNetworkInterface->close != NULL )\r
+    if( closeConnection != NULL )\r
     {\r
-        closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );\r
+        closeStatus = closeConnection( pNetworkConnection );\r
 \r
         if( closeStatus == IOT_NETWORK_SUCCESS )\r
         {\r
@@ -842,14 +916,14 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
     }\r
 \r
     /* Invoke the disconnect callback. */\r
-    if( pMqttConnection->disconnectCallback.function != NULL )\r
+    if( disconnectCallback != NULL )\r
     {\r
         /* Set the members of the callback parameter. */\r
         callbackParam.mqttConnection = pMqttConnection;\r
         callbackParam.u.disconnectReason = disconnectReason;\r
 \r
-        pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,\r
-                                                      &callbackParam );\r
+        disconnectCallback( pDisconnectCallbackContext,\r
+                            &callbackParam );\r
     }\r
     else\r
     {\r
index 9e4ec282e0c24108f41e512e718b822815008595..af343f6373eab44cb89904d9bd2f800b3522f26f 100644 (file)
@@ -44,6 +44,9 @@
 #include "platform/iot_clock.h"\r
 #include "platform/iot_threads.h"\r
 \r
+/* Atomics include. */\r
+#include "iot_atomic.h"\r
+\r
 /*-----------------------------------------------------------*/\r
 \r
 /**\r
@@ -133,7 +136,7 @@ static bool _mqttOperation_match( const IotLink_t * pOperationLink,
 static bool _checkRetryLimit( _mqttOperation_t * pOperation )\r
 {\r
     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
-    bool status = true;\r
+    bool status = true, setDup = false;\r
 \r
     /* Choose a set DUP function. */\r
     void ( * publishSetDup )( uint8_t *,\r
@@ -162,36 +165,65 @@ static bool _checkRetryLimit( _mqttOperation_t * pOperation )
     IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );\r
 \r
     /* Check if the retry limit is exhausted. */\r
-    if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+    if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )\r
     {\r
         /* The retry count may be at most one more than the retry limit, which\r
          * accounts for the final check for a PUBACK. */\r
-        IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );\r
+        IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==\r
+                        pOperation->u.operation.periodic.retry.limit + 1 );\r
 \r
         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",\r
                      pMqttConnection,\r
                      pOperation,\r
-                     pOperation->u.operation.retry.limit );\r
+                     pOperation->u.operation.periodic.retry.limit );\r
 \r
         status = false;\r
     }\r
-    /* Check if this is the first retry. */\r
-    else if( pOperation->u.operation.retry.count == 1 )\r
-    {\r
-        /* Always set the DUP flag on the first retry. */\r
-        publishSetDup( pOperation->u.operation.pMqttPacket,\r
-                       pOperation->u.operation.pPacketIdentifierHigh,\r
-                       &( pOperation->u.operation.packetIdentifier ) );\r
-    }\r
     else\r
     {\r
-        /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
-         * identifier) must be reset on every retry. */\r
-        if( pMqttConnection->awsIotMqttMode == true )\r
+        if( pOperation->u.operation.periodic.retry.count == 1 )\r
+        {\r
+            /* The DUP flag should always be set on the first retry. */\r
+            setDup = true;\r
+        }\r
+        else if( pMqttConnection->awsIotMqttMode == true )\r
         {\r
+            /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
+             * identifier) must be reset on every retry. */\r
+            setDup = true;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( setDup == true )\r
+        {\r
+            /* In AWS IoT MQTT mode, the references mutex must be locked to\r
+             * prevent the packet identifier from being read while it is being\r
+             * changed. */\r
+            if( pMqttConnection->awsIotMqttMode == true )\r
+            {\r
+                IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+\r
+            /* Always set the DUP flag on the first retry. */\r
             publishSetDup( pOperation->u.operation.pMqttPacket,\r
                            pOperation->u.operation.pPacketIdentifierHigh,\r
                            &( pOperation->u.operation.packetIdentifier ) );\r
+\r
+            if( pMqttConnection->awsIotMqttMode == true )\r
+            {\r
+                IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
         }\r
         else\r
         {\r
@@ -213,14 +245,16 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
 \r
     /* This function should never be called with retry count greater than\r
      * retry limit. */\r
-    IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );\r
+    IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=\r
+                    pOperation->u.operation.periodic.retry.limit );\r
 \r
     /* Increment the retry count. */\r
-    ( pOperation->u.operation.retry.count )++;\r
+    ( pOperation->u.operation.periodic.retry.count )++;\r
 \r
     /* Check for a response shortly for the final retry. Otherwise, calculate the\r
      * next retry period. */\r
-    if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+    if( pOperation->u.operation.periodic.retry.count >\r
+        pOperation->u.operation.periodic.retry.limit )\r
     {\r
         scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;\r
 \r
@@ -232,14 +266,14 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
     }\r
     else\r
     {\r
-        scheduleDelay = pOperation->u.operation.retry.nextPeriod;\r
+        scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;\r
 \r
         /* Double the retry period, subject to a ceiling value. */\r
-        pOperation->u.operation.retry.nextPeriod *= 2;\r
+        pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;\r
 \r
-        if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )\r
+        if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )\r
         {\r
-            pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;\r
+            pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;\r
         }\r
         else\r
         {\r
@@ -249,12 +283,12 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",\r
                      pMqttConnection,\r
                      pOperation,\r
-                     ( unsigned long ) pOperation->u.operation.retry.count,\r
-                     ( unsigned long ) pOperation->u.operation.retry.limit,\r
+                     ( unsigned long ) pOperation->u.operation.periodic.retry.count,\r
+                     ( unsigned long ) pOperation->u.operation.periodic.retry.limit,\r
                      ( unsigned long ) scheduleDelay );\r
 \r
         /* Check if this is the first retry. */\r
-        firstRetry = ( pOperation->u.operation.retry.count == 1 );\r
+        firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );\r
 \r
         /* On the first retry, the PUBLISH will be moved from the pending processing\r
          * list to the pending responses list. Lock the connection references mutex\r
@@ -332,7 +366,7 @@ IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
         {\r
             IotLogError( "Callback should not be set for a waitable operation." );\r
 \r
-            return IOT_MQTT_BAD_PARAMETER;\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
         }\r
         else\r
         {\r
@@ -509,8 +543,8 @@ bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
                      pMqttConnection,\r
                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
                      pOperation,\r
-                     pOperation->u.operation.jobReference + 1,\r
-                     pOperation->u.operation.jobReference );\r
+                     ( long ) ( pOperation->u.operation.jobReference + 1 ),\r
+                     ( long ) ( pOperation->u.operation.jobReference ) );\r
 \r
         /* The job reference count must be 0 or 1 after the decrement. */\r
         IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||\r
@@ -649,23 +683,30 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
                                 void * pContext )\r
 {\r
     bool status = true;\r
+    uint32_t swapStatus = 0;\r
     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
     size_t bytesSent = 0;\r
 \r
+    /* Swap status is not checked when asserts are disabled. */\r
+    ( void ) swapStatus;\r
+\r
     /* Retrieve the MQTT connection from the context. */\r
     _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;\r
+    _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );\r
 \r
     /* Check parameters. */\r
     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
-    IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );\r
+    IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );\r
 \r
     /* Check that keep-alive interval is valid. The MQTT spec states its maximum\r
      * value is 65,535 seconds. */\r
-    IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );\r
+    IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );\r
 \r
     /* Only two values are valid for the next keep alive job delay. */\r
-    IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||\r
-                    ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
+    IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==\r
+                      pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||\r
+                    ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs\r
+                      == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
 \r
     IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );\r
 \r
@@ -676,10 +717,9 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
                                             &pKeepAliveJob );\r
     IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
 \r
-    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
-\r
     /* Determine whether to send a PINGREQ or check for PINGRESP. */\r
-    if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )\r
+    if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==\r
+        pPingreqOperation->u.operation.periodic.ping.keepAliveMs )\r
     {\r
         IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );\r
 \r
@@ -687,10 +727,10 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
          * more important than other operations. Bypass the queue of jobs for\r
          * operations by directly sending the PINGREQ in this job. */\r
         bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
-                                                              pMqttConnection->pPingreqPacket,\r
-                                                              pMqttConnection->pingreqPacketSize );\r
+                                                              pPingreqOperation->u.operation.pMqttPacket,\r
+                                                              pPingreqOperation->u.operation.packetSize );\r
 \r
-        if( bytesSent != pMqttConnection->pingreqPacketSize )\r
+        if( bytesSent != pPingreqOperation->u.operation.packetSize )\r
         {\r
             IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );\r
             status = false;\r
@@ -699,10 +739,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
         {\r
             /* Assume the keep-alive will fail. The network receive callback will\r
              * clear the failure flag upon receiving a PINGRESP. */\r
-            pMqttConnection->keepAliveFailure = true;\r
+            swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),\r
+                                                    1,\r
+                                                    0 );\r
+            IotMqtt_Assert( swapStatus == 1 );\r
 \r
             /* Schedule a check for PINGRESP. */\r
-            pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
+            pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
 \r
             IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",\r
                          pMqttConnection,\r
@@ -713,12 +756,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
     {\r
         IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );\r
 \r
-        if( pMqttConnection->keepAliveFailure == false )\r
+        if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )\r
         {\r
             IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );\r
 \r
             /* PINGRESP was received. Schedule the next PINGREQ transmission. */\r
-            pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+            pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =\r
+                pPingreqOperation->u.operation.periodic.ping.keepAliveMs;\r
         }\r
         else\r
         {\r
@@ -737,13 +781,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
     {\r
         taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,\r
                                                        pKeepAliveJob,\r
-                                                       pMqttConnection->nextKeepAliveMs );\r
+                                                       pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );\r
 \r
         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
         {\r
-            IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",\r
+            IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",\r
                          pMqttConnection,\r
-                         IOT_MQTT_RESPONSE_WAIT_MS );\r
+                         ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );\r
         }\r
         else\r
         {\r
@@ -769,8 +813,6 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
     {\r
         EMPTY_ELSE_MARKER;\r
     }\r
-\r
-    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
 }\r
 \r
 /*-----------------------------------------------------------*/\r
@@ -799,6 +841,8 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
     }\r
     else\r
     {\r
+        /* This operation may have already been removed by cleanup of pending\r
+         * operations (called from Disconnect). In that case, do nothing here. */\r
         EMPTY_ELSE_MARKER;\r
     }\r
 \r
@@ -810,15 +854,9 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
     _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,\r
                                          &callbackParam );\r
 \r
-    /* Free any buffers associated with the current PUBLISH message. */\r
-    if( pOperation->u.publish.pReceivedData != NULL )\r
-    {\r
-        IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
-    }\r
-    else\r
-    {\r
-        EMPTY_ELSE_MARKER;\r
-    }\r
+    /* Free buffers associated with the current PUBLISH message. */\r
+    IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );\r
+    IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
 \r
     /* Free the incoming PUBLISH operation. */\r
     IotMqtt_FreeOperation( pOperation );\r
@@ -851,7 +889,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
     waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
 \r
     /* Check PUBLISH retry counts and limits. */\r
-    if( pOperation->u.operation.retry.limit > 0 )\r
+    if( pOperation->u.operation.periodic.retry.limit > 0 )\r
     {\r
         if( _checkRetryLimit( pOperation ) == false )\r
         {\r
@@ -923,7 +961,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
     if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
     {\r
         /* Check if this operation should be scheduled for retransmission. */\r
-        if( pOperation->u.operation.retry.limit > 0 )\r
+        if( pOperation->u.operation.periodic.retry.limit > 0 )\r
         {\r
             if( _scheduleNextRetry( pOperation ) == false )\r
             {\r
@@ -1141,7 +1179,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
 \r
         /* Check if the matched operation is a PUBLISH with retry. If it is, cancel\r
          * the retry job. */\r
-        if( pResult->u.operation.retry.limit > 0 )\r
+        if( pResult->u.operation.periodic.retry.limit > 0 )\r
         {\r
             taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
                                                     pResult->job,\r
@@ -1188,7 +1226,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
 \r
     if( pResult != NULL )\r
     {\r
-        IotLogDebug( "(MQTT connection %p) Found operation %s." ,\r
+        IotLogDebug( "(MQTT connection %p) Found operation %s.",\r
                      pMqttConnection,\r
                      IotMqtt_OperationType( type ) );\r
 \r
index f42c80c6aafe7428f96918d374ca059ed8174c9b..d3447eefd777e11847ee712ea2df76598081f5fa 100644 (file)
@@ -1375,12 +1375,12 @@ IotMqttError_t _IotMqtt_DeserializePublish( _mqttPacket_t * pPublish )
      * a packet identifer, but QoS 0 PUBLISH packets do not. */\r
     if( pOutput->qos == IOT_MQTT_QOS_0 )\r
     {\r
-        pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );\r
+        pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );\r
         pOutput->pPayload = pPacketIdentifierHigh;\r
     }\r
     else\r
     {\r
-        pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );\r
+        pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );\r
         pOutput->pPayload = pPacketIdentifierHigh + sizeof( uint16_t );\r
     }\r
 \r
index 59caccc7fb33103fef9701b2ef77e55a0e4238a4..f4546b55826a27edb439a6abce0ebf01865e0996 100644 (file)
@@ -586,7 +586,7 @@ void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnecti
 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
                            const char * pTopicFilter,\r
                            uint16_t topicFilterLength,\r
-                           IotMqttSubscription_t * pCurrentSubscription )\r
+                           IotMqttSubscription_t * const pCurrentSubscription )\r
 {\r
     bool status = false;\r
     _mqttSubscription_t * pSubscription = NULL;\r
@@ -595,7 +595,7 @@ bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
 \r
     topicMatchParams.pTopicName = pTopicFilter;\r
     topicMatchParams.topicNameLength = topicFilterLength;\r
-    topicMatchParams.exactMatchOnly = false;\r
+    topicMatchParams.exactMatchOnly = true;\r
 \r
     /* Prevent any other thread from modifying the subscription list while this\r
      * function is running. */\r
index 80aef1c60dd25f6c11b45a7e9fedf0b1c38beee8..5ac1b948995c619d4d54c19b344cacc080b7fc4f 100644 (file)
 /*---------------------- MQTT internal data structures ----------------------*/\r
 \r
 /**\r
- * @brief Represents an MQTT connection.\r
- */\r
-typedef struct _mqttConnection\r
-{\r
-    bool awsIotMqttMode;                             /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */\r
-    bool ownNetworkConnection;                       /**< @brief Whether this MQTT connection owns its network connection. */\r
-    void * pNetworkConnection;                       /**< @brief References the transport-layer network connection. */\r
-    const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */\r
-    IotMqttCallbackInfo_t disconnectCallback;        /**< @brief A function to invoke when this connection is disconnected. */\r
-\r
-    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
-        const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */\r
-    #endif\r
-\r
-    bool disconnected;                              /**< @brief Tracks if this connection has been disconnected. */\r
-    IotMutex_t referencesMutex;                     /**< @brief Recursive mutex. Grants access to connection state and operation lists. */\r
-    int32_t references;                             /**< @brief Counts callbacks and operations using this connection. */\r
-    IotListDouble_t pendingProcessing;              /**< @brief List of operations waiting to be processed by a task pool routine. */\r
-    IotListDouble_t pendingResponse;                /**< @brief List of processed operations awaiting a server response. */\r
-\r
-    IotListDouble_t subscriptionList;               /**< @brief Holds subscriptions associated with this connection. */\r
-    IotMutex_t subscriptionMutex;                   /**< @brief Grants exclusive access to the subscription list. */\r
-\r
-    bool keepAliveFailure;                          /**< @brief Failure flag for keep-alive operation. */\r
-    uint32_t keepAliveMs;                           /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */\r
-    uint32_t nextKeepAliveMs;                       /**< @brief Relative delay for next keep-alive job. */\r
-    IotTaskPoolJobStorage_t keepAliveJobStorage;     /**< @brief Task pool job for processing this connection's keep-alive. */\r
-    IotTaskPoolJob_t keepAliveJob;                  /**< @brief Task pool job for processing this connection's keep-alive. */\r
-    uint8_t * pPingreqPacket;                       /**< @brief An MQTT PINGREQ packet, allocated if keep-alive is active. */\r
-    size_t pingreqPacketSize;                       /**< @brief The size of an allocated PINGREQ packet. */\r
-} _mqttConnection_t;\r
-\r
-/**\r
- * @brief Represents a subscription stored in an MQTT connection.\r
+ * @cond DOXYGEN_IGNORE\r
+ * Doxygen should ignore this section.\r
+ *\r
+ * Forward declaration of MQTT connection type.\r
  */\r
-typedef struct _mqttSubscription\r
-{\r
-    IotLink_t link;     /**< @brief List link member. */\r
-\r
-    int32_t references; /**< @brief How many subscription callbacks are using this subscription. */\r
-\r
-    /**\r
-     * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for\r
-     * this subscription.\r
-     *\r
-     * If there are active subscription callbacks, @ref mqtt_function_unsubscribe\r
-     * cannot remove this subscription. Instead, it will set this flag, which\r
-     * schedules the removal of this subscription once all subscription callbacks\r
-     * terminate.\r
-     */\r
-    bool unsubscribed;\r
-\r
-    struct\r
-    {\r
-        uint16_t identifier;        /**< @brief Packet identifier. */\r
-        size_t order;               /**< @brief Order in the packet's list of subscriptions. */\r
-    } packetInfo;                   /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */\r
-\r
-    IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */\r
-\r
-    uint16_t topicFilterLength;     /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */\r
-    char pTopicFilter[];            /**< @brief The subscription topic filter. */\r
-} _mqttSubscription_t;\r
+struct _mqttConnection;\r
+/** @endcond */\r
 \r
 /**\r
  * @brief Internal structure representing a single MQTT operation, such as\r
@@ -321,13 +265,13 @@ typedef struct _mqttSubscription
 typedef struct _mqttOperation\r
 {\r
     /* Pointers to neighboring queue elements. */\r
-    IotLink_t link;                      /**< @brief List link member. */\r
+    IotLink_t link;                           /**< @brief List link member. */\r
 \r
-    bool incomingPublish;                /**< @brief Set to true if this operation an incoming PUBLISH. */\r
-    _mqttConnection_t * pMqttConnection; /**< @brief MQTT connection associated with this operation. */\r
+    bool incomingPublish;                     /**< @brief Set to true if this operation an incoming PUBLISH. */\r
+    struct _mqttConnection * pMqttConnection; /**< @brief MQTT connection associated with this operation. */\r
 \r
-    IotTaskPoolJobStorage_t jobStorage;  /**< @brief Task pool job storage associated with this operation. */\r
-    IotTaskPoolJob_t job;                /**< @brief Task pool job associated with this operation. */\r
+    IotTaskPoolJobStorage_t jobStorage;       /**< @brief Task pool job storage associated with this operation. */\r
+    IotTaskPoolJob_t job;                     /**< @brief Task pool job associated with this operation. */\r
 \r
     union\r
     {\r
@@ -353,12 +297,22 @@ typedef struct _mqttOperation
             } notify;                           /**< @brief How to notify of this operation's completion. */\r
             IotMqttError_t status;              /**< @brief Result of this operation. This is reported once a response is received. */\r
 \r
-            struct\r
+            union\r
             {\r
-                uint32_t count;\r
-                uint32_t limit;\r
-                uint32_t nextPeriod;\r
-            } retry;\r
+                struct\r
+                {\r
+                    uint32_t count;        /**< @brief Current number of retries. */\r
+                    uint32_t limit;        /**< @brief Maximum number of retries allowed. */\r
+                    uint32_t nextPeriodMs; /**< @brief Next retry period. */\r
+                } retry;                   /**< @brief Additional information for PUBLISH retry. */\r
+\r
+                struct\r
+                {\r
+                    uint32_t failure;      /**< @brief Flag tracking keep-alive status. */\r
+                    uint32_t keepAliveMs;     /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */\r
+                    uint32_t nextPeriodMs; /**< @brief Relative delay for next keep-alive job. */\r
+                } ping;                    /**< @brief Additional information for keep-alive pings. */\r
+            } periodic;                    /**< @brief Additional information for periodic operations. */\r
         } operation;\r
 \r
         /* If incomingPublish is true, this struct is valid. */\r
@@ -370,6 +324,65 @@ typedef struct _mqttOperation
     } u;                                      /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */\r
 } _mqttOperation_t;\r
 \r
+/**\r
+ * @brief Represents an MQTT connection.\r
+ */\r
+typedef struct _mqttConnection\r
+{\r
+    bool awsIotMqttMode;                             /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */\r
+    bool ownNetworkConnection;                       /**< @brief Whether this MQTT connection owns its network connection. */\r
+    void * pNetworkConnection;                       /**< @brief References the transport-layer network connection. */\r
+    const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */\r
+    IotMqttCallbackInfo_t disconnectCallback;        /**< @brief A function to invoke when this connection is disconnected. */\r
+\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */\r
+    #endif\r
+\r
+    bool disconnected;                 /**< @brief Tracks if this connection has been disconnected. */\r
+    IotMutex_t referencesMutex;        /**< @brief Recursive mutex. Grants access to connection state and operation lists. */\r
+    int32_t references;                /**< @brief Counts callbacks and operations using this connection. */\r
+    IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */\r
+    IotListDouble_t pendingResponse;   /**< @brief List of processed operations awaiting a server response. */\r
+\r
+    IotListDouble_t subscriptionList;  /**< @brief Holds subscriptions associated with this connection. */\r
+    IotMutex_t subscriptionMutex;      /**< @brief Grants exclusive access to the subscription list. */\r
+\r
+    _mqttOperation_t pingreq;          /**< @brief Operation used for MQTT keep-alive. */\r
+} _mqttConnection_t;\r
+\r
+/**\r
+ * @brief Represents a subscription stored in an MQTT connection.\r
+ */\r
+typedef struct _mqttSubscription\r
+{\r
+    IotLink_t link;     /**< @brief List link member. */\r
+\r
+    int32_t references; /**< @brief How many subscription callbacks are using this subscription. */\r
+\r
+    /**\r
+     * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for\r
+     * this subscription.\r
+     *\r
+     * If there are active subscription callbacks, @ref mqtt_function_unsubscribe\r
+     * cannot remove this subscription. Instead, it will set this flag, which\r
+     * schedules the removal of this subscription once all subscription callbacks\r
+     * terminate.\r
+     */\r
+    bool unsubscribed;\r
+\r
+    struct\r
+    {\r
+        uint16_t identifier;        /**< @brief Packet identifier. */\r
+        size_t order;               /**< @brief Order in the packet's list of subscriptions. */\r
+    } packetInfo;                   /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */\r
+\r
+    IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */\r
+\r
+    uint16_t topicFilterLength;     /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */\r
+    char pTopicFilter[];            /**< @brief The subscription topic filter. */\r
+} _mqttSubscription_t;\r
+\r
 /**\r
  * @brief Represents an MQTT packet received from the network.\r
  *\r