]> git.sur5r.net Git - freertos/blobdiff - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
Rename \FreeRTOS-Plus\Source\FreeRTOS-Plus-IoT-SDK to \FreeRTOS-Plus\Source\FreeRTOS...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_operation.c
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
new file mode 100644 (file)
index 0000000..7923d62
--- /dev/null
@@ -0,0 +1,1332 @@
+/*\r
+ * Amazon FreeRTOS MQTT V2.0.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://aws.amazon.com/freertos\r
+ * http://www.FreeRTOS.org\r
+ */\r
+\r
+/**\r
+ * @file iot_mqtt_operation.c\r
+ * @brief Implements functions that process MQTT operations.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <string.h>\r
+\r
+/* Error handling include. */\r
+#include "private/iot_error.h"\r
+\r
+/* MQTT internal include. */\r
+#include "private/iot_mqtt_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_clock.h"\r
+#include "platform/iot_threads.h"\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief First parameter to #_mqttOperation_match.\r
+ */\r
+typedef struct _operationMatchParam\r
+{\r
+    IotMqttOperationType_t type;        /**< @brief The type of operation to look for. */\r
+    const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.\r
+                                         * Set to `NULL` to ignore packet identifier. */\r
+} _operationMatchParam_t;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Match an MQTT operation by type and packet identifier.\r
+ *\r
+ * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.\r
+ * @param[in] pMatch Pointer to an #_operationMatchParam_t.\r
+ *\r
+ * @return `true` if the operation matches the parameters in `pArgument`; `false`\r
+ * otherwise.\r
+ */\r
+static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
+                                  void * pMatch );\r
+\r
+/**\r
+ * @brief Check if an operation with retry has exceeded its retry limit.\r
+ *\r
+ * If a PUBLISH operation is available for retry, this function also sets any\r
+ * necessary DUP flags.\r
+ *\r
+ * @param[in] pOperation The operation to check.\r
+ *\r
+ * @return `true` if the operation may be retried; `false` otherwise.\r
+ */\r
+static bool _checkRetryLimit( _mqttOperation_t * pOperation );\r
+\r
+/**\r
+ * @brief Schedule the next send of an operation with retry.\r
+ *\r
+ * @param[in] pOperation The operation to schedule.\r
+ *\r
+ * @return `true` if the reschedule succeeded; `false` otherwise.\r
+ */\r
+static bool _scheduleNextRetry( _mqttOperation_t * pOperation );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
+                                  void * pMatch )\r
+{\r
+    bool match = false;\r
+\r
+    /* Because this function is called from a container function, the given link\r
+     * must never be NULL. */\r
+    IotMqtt_Assert( pOperationLink != NULL );\r
+\r
+    _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,\r
+                                                       pOperationLink,\r
+                                                       link );\r
+    _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;\r
+\r
+    /* Check for matching operations. */\r
+    if( pParam->type == pOperation->u.operation.type )\r
+    {\r
+        /* Check for matching packet identifiers. */\r
+        if( pParam->pPacketIdentifier == NULL )\r
+        {\r
+            match = true;\r
+        }\r
+        else\r
+        {\r
+            match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return match;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _checkRetryLimit( _mqttOperation_t * pOperation )\r
+{\r
+    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+    bool status = true;\r
+\r
+    /* Choose a set DUP function. */\r
+    void ( * publishSetDup )( uint8_t *,\r
+                              uint8_t *,\r
+                              uint16_t * ) = _IotMqtt_PublishSetDup;\r
+\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        if( pMqttConnection->pSerializer != NULL )\r
+        {\r
+            if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )\r
+            {\r
+                publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* Only PUBLISH may be retried. */\r
+    IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );\r
+\r
+    /* Check if the retry limit is exhausted. */\r
+    if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+    {\r
+        /* The retry count may be at most one more than the retry limit, which\r
+         * accounts for the final check for a PUBACK. */\r
+        IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );\r
+\r
+        IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",\r
+                     pMqttConnection,\r
+                     pOperation,\r
+                     pOperation->u.operation.retry.limit );\r
+\r
+        status = false;\r
+    }\r
+    /* Check if this is the first retry. */\r
+    else if( pOperation->u.operation.retry.count == 1 )\r
+    {\r
+        /* Always set the DUP flag on the first retry. */\r
+        publishSetDup( pOperation->u.operation.pMqttPacket,\r
+                       pOperation->u.operation.pPacketIdentifierHigh,\r
+                       &( pOperation->u.operation.packetIdentifier ) );\r
+    }\r
+    else\r
+    {\r
+        /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
+         * identifier) must be reset on every retry. */\r
+        if( pMqttConnection->awsIotMqttMode == true )\r
+        {\r
+            publishSetDup( pOperation->u.operation.pMqttPacket,\r
+                           pOperation->u.operation.pPacketIdentifierHigh,\r
+                           &( pOperation->u.operation.packetIdentifier ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _scheduleNextRetry( _mqttOperation_t * pOperation )\r
+{\r
+    bool firstRetry = false;\r
+    uint32_t scheduleDelay = 0;\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+    /* This function should never be called with retry count greater than\r
+     * retry limit. */\r
+    IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );\r
+\r
+    /* Increment the retry count. */\r
+    ( pOperation->u.operation.retry.count )++;\r
+\r
+    /* Check for a response shortly for the final retry. Otherwise, calculate the\r
+     * next retry period. */\r
+    if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+    {\r
+        scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;\r
+\r
+        IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "\r
+                     "for response in %d ms.",\r
+                     pMqttConnection,\r
+                     pOperation,\r
+                     IOT_MQTT_RESPONSE_WAIT_MS );\r
+    }\r
+    else\r
+    {\r
+        scheduleDelay = pOperation->u.operation.retry.nextPeriod;\r
+\r
+        /* Double the retry period, subject to a ceiling value. */\r
+        pOperation->u.operation.retry.nextPeriod *= 2;\r
+\r
+        if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )\r
+        {\r
+            pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",\r
+                     pMqttConnection,\r
+                     pOperation,\r
+                     ( unsigned long ) pOperation->u.operation.retry.count,\r
+                     ( unsigned long ) pOperation->u.operation.retry.limit,\r
+                     ( unsigned long ) scheduleDelay );\r
+\r
+        /* Check if this is the first retry. */\r
+        firstRetry = ( pOperation->u.operation.retry.count == 1 );\r
+\r
+        /* On the first retry, the PUBLISH will be moved from the pending processing\r
+         * list to the pending responses list. Lock the connection references mutex\r
+         * to manipulate the lists. */\r
+        if( firstRetry == true )\r
+        {\r
+            IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+\r
+    /* Reschedule the PUBLISH for another send. */\r
+    status = _IotMqtt_ScheduleOperation( pOperation,\r
+                                         _IotMqtt_ProcessSend,\r
+                                         scheduleDelay );\r
+\r
+    /* Check for successful reschedule. */\r
+    if( status == IOT_MQTT_SUCCESS )\r
+    {\r
+        /* Move a successfully rescheduled PUBLISH from the pending processing\r
+         * list to the pending responses list on the first retry. */\r
+        if( firstRetry == true )\r
+        {\r
+            /* Operation must be linked. */\r
+            IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );\r
+\r
+            /* Transfer to pending response list. */\r
+            IotListDouble_Remove( &( pOperation->link ) );\r
+            IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
+                                      &( pOperation->link ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* The references mutex only needs to be unlocked on the first retry, since\r
+     * only the first retry manipulates the connection lists. */\r
+    if( firstRetry == true )\r
+    {\r
+        IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return( status == IOT_MQTT_SUCCESS );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,\r
+                                         uint32_t flags,\r
+                                         const IotMqttCallbackInfo_t * pCallbackInfo,\r
+                                         _mqttOperation_t ** pNewOperation )\r
+{\r
+    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+    bool decrementOnError = false;\r
+    _mqttOperation_t * pOperation = NULL;\r
+    bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );\r
+\r
+    /* If the waitable flag is set, make sure that there's no callback. */\r
+    if( waitable == true )\r
+    {\r
+        if( pCallbackInfo != NULL )\r
+        {\r
+            IotLogError( "Callback should not be set for a waitable operation." );\r
+\r
+            return IOT_MQTT_BAD_PARAMETER;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IotLogDebug( "(MQTT connection %p) Creating new operation record.",\r
+                 pMqttConnection );\r
+\r
+    /* Increment the reference count for the MQTT connection when creating a new\r
+     * operation. */\r
+    if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )\r
+    {\r
+        IotLogError( "(MQTT connection %p) New operation record cannot be created"\r
+                     " for a closed connection",\r
+                     pMqttConnection );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
+    }\r
+    else\r
+    {\r
+        /* Reference count will need to be decremented on error. */\r
+        decrementOnError = true;\r
+    }\r
+\r
+    /* Allocate memory for a new operation. */\r
+    pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );\r
+\r
+    if( pOperation == NULL )\r
+    {\r
+        IotLogError( "(MQTT connection %p) Failed to allocate memory for new "\r
+                     "operation record.",\r
+                     pMqttConnection );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
+    }\r
+    else\r
+    {\r
+        /* Clear the operation data. */\r
+        ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );\r
+\r
+        /* Initialize some members of the new operation. */\r
+        pOperation->pMqttConnection = pMqttConnection;\r
+        pOperation->u.operation.jobReference = 1;\r
+        pOperation->u.operation.flags = flags;\r
+        pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;\r
+    }\r
+\r
+    /* Check if the waitable flag is set. If it is, create a semaphore to\r
+     * wait on. */\r
+    if( waitable == true )\r
+    {\r
+        /* Create a semaphore to wait on for a waitable operation. */\r
+        if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )\r
+        {\r
+            IotLogError( "(MQTT connection %p) Failed to create semaphore for "\r
+                         "waitable operation.",\r
+                         pMqttConnection );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
+        }\r
+        else\r
+        {\r
+            /* A waitable operation is created with an additional reference for the\r
+             * Wait function. */\r
+            ( pOperation->u.operation.jobReference )++;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        /* If the waitable flag isn't set but a callback is, copy the callback\r
+         * information. */\r
+        if( pCallbackInfo != NULL )\r
+        {\r
+            pOperation->u.operation.notify.callback = *pCallbackInfo;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+\r
+    /* Add this operation to the MQTT connection's operation list. */\r
+    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+    IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
+                              &( pOperation->link ) );\r
+    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    /* Set the output parameter. */\r
+    *pNewOperation = pOperation;\r
+\r
+    /* Clean up operation and decrement reference count if this function failed. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        if( decrementOnError == true )\r
+        {\r
+            _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( pOperation != NULL )\r
+        {\r
+            IotMqtt_FreeOperation( pOperation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,\r
+                                            bool cancelJob )\r
+{\r
+    bool destroyOperation = false;\r
+    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+    /* Attempt to cancel the operation's job. */\r
+    if( cancelJob == true )\r
+    {\r
+        taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
+                                                pOperation->job,\r
+                                                NULL );\r
+\r
+        /* If the operation's job was not canceled, it must be already executing.\r
+         * Any other return value is invalid. */\r
+        IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
+                        ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
+\r
+        if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+        {\r
+            IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",\r
+                         pMqttConnection,\r
+                         IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                         pOperation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Decrement job reference count. */\r
+    if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+    {\r
+        IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+        pOperation->u.operation.jobReference--;\r
+\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"\r
+                     " from %ld to %ld.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation,\r
+                     pOperation->u.operation.jobReference + 1,\r
+                     pOperation->u.operation.jobReference );\r
+\r
+        /* The job reference count must be 0 or 1 after the decrement. */\r
+        IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||\r
+                        ( pOperation->u.operation.jobReference == 1 ) );\r
+\r
+        /* This operation may be destroyed if its reference count is 0. */\r
+        if( pOperation->u.operation.jobReference == 0 )\r
+        {\r
+            destroyOperation = true;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return destroyOperation;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )\r
+{\r
+    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+    /* Default free packet function. */\r
+    void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
+\r
+    IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",\r
+                 pMqttConnection,\r
+                 IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                 pOperation );\r
+\r
+    /* The job reference count must be between 0 and 2. */\r
+    IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&\r
+                    ( pOperation->u.operation.jobReference <= 2 ) );\r
+\r
+    /* Jobs to be destroyed should be removed from the MQTT connection's\r
+     * lists. */\r
+    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation,\r
+                     pMqttConnection );\r
+\r
+        IotListDouble_Remove( &( pOperation->link ) );\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation );\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    /* Free any allocated MQTT packet. */\r
+    if( pOperation->u.operation.pMqttPacket != NULL )\r
+    {\r
+        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+            if( pMqttConnection->pSerializer != NULL )\r
+            {\r
+                if( pMqttConnection->pSerializer->freePacket != NULL )\r
+                {\r
+                    freePacket = pMqttConnection->pSerializer->freePacket;\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+        freePacket( pOperation->u.operation.pMqttPacket );\r
+\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation );\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation );\r
+    }\r
+\r
+    /* Check if a wait semaphore was created for this operation. */\r
+    if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
+    {\r
+        IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );\r
+\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",\r
+                 pMqttConnection,\r
+                 IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                 pOperation );\r
+\r
+    /* Free the memory used to hold operation data. */\r
+    IotMqtt_FreeOperation( pOperation );\r
+\r
+    /* Decrement the MQTT connection's reference count after destroying an\r
+     * operation. */\r
+    _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,\r
+                                IotTaskPoolJob_t pKeepAliveJob,\r
+                                void * pContext )\r
+{\r
+    bool status = true;\r
+    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+    size_t bytesSent = 0;\r
+\r
+    /* Retrieve the MQTT connection from the context. */\r
+    _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;\r
+\r
+    /* Check parameters. */\r
+    IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+    IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );\r
+\r
+    /* Check that keep-alive interval is valid. The MQTT spec states its maximum\r
+     * value is 65,535 seconds. */\r
+    IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );\r
+\r
+    /* Only two values are valid for the next keep alive job delay. */\r
+    IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||\r
+                    ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
+\r
+    IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );\r
+\r
+    /* Re-create the keep-alive job for rescheduling. This should never fail. */\r
+    taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
+                                            pContext,\r
+                                            IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),\r
+                                            &pKeepAliveJob );\r
+    IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
+\r
+    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    /* Determine whether to send a PINGREQ or check for PINGRESP. */\r
+    if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );\r
+\r
+        /* Because PINGREQ may be used to keep the MQTT connection alive, it is\r
+         * more important than other operations. Bypass the queue of jobs for\r
+         * operations by directly sending the PINGREQ in this job. */\r
+        bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
+                                                              pMqttConnection->pPingreqPacket,\r
+                                                              pMqttConnection->pingreqPacketSize );\r
+\r
+        if( bytesSent != pMqttConnection->pingreqPacketSize )\r
+        {\r
+            IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );\r
+            status = false;\r
+        }\r
+        else\r
+        {\r
+            /* Assume the keep-alive will fail. The network receive callback will\r
+             * clear the failure flag upon receiving a PINGRESP. */\r
+            pMqttConnection->keepAliveFailure = true;\r
+\r
+            /* Schedule a check for PINGRESP. */\r
+            pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
+\r
+            IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",\r
+                         pMqttConnection,\r
+                         IOT_MQTT_RESPONSE_WAIT_MS );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );\r
+\r
+        if( pMqttConnection->keepAliveFailure == false )\r
+        {\r
+            IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );\r
+\r
+            /* PINGRESP was received. Schedule the next PINGREQ transmission. */\r
+            pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+        }\r
+        else\r
+        {\r
+            IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",\r
+                         pMqttConnection,\r
+                         IOT_MQTT_RESPONSE_WAIT_MS );\r
+\r
+            /* The network receive callback did not clear the failure flag. */\r
+            status = false;\r
+        }\r
+    }\r
+\r
+    /* When a PINGREQ is successfully sent, reschedule this job to check for a\r
+     * response shortly. */\r
+    if( status == true )\r
+    {\r
+        taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,\r
+                                                       pKeepAliveJob,\r
+                                                       pMqttConnection->nextKeepAliveMs );\r
+\r
+        if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+        {\r
+            IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",\r
+                         pMqttConnection,\r
+                         IOT_MQTT_RESPONSE_WAIT_MS );\r
+        }\r
+        else\r
+        {\r
+            IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",\r
+                         pMqttConnection,\r
+                         IotTaskPool_strerror( taskPoolStatus ) );\r
+\r
+            status = false;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Close the connection on failures. */\r
+    if( status == false )\r
+    {\r
+        _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,\r
+                                         pMqttConnection );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,\r
+                                      IotTaskPoolJob_t pPublishJob,\r
+                                      void * pContext )\r
+{\r
+    _mqttOperation_t * pOperation = pContext;\r
+    IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };\r
+\r
+    /* Check parameters. The task pool and job parameter is not used when asserts\r
+     * are disabled. */\r
+    ( void ) pTaskPool;\r
+    ( void ) pPublishJob;\r
+    IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+    IotMqtt_Assert( pOperation->incomingPublish == true );\r
+    IotMqtt_Assert( pPublishJob == pOperation->job );\r
+\r
+    /* Remove the operation from the pending processing list. */\r
+    IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );\r
+\r
+    if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
+    {\r
+        IotListDouble_Remove( &( pOperation->link ) );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );\r
+\r
+    /* Process the current PUBLISH. */\r
+    callbackParam.u.message.info = pOperation->u.publish.publishInfo;\r
+\r
+    _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,\r
+                                         &callbackParam );\r
+\r
+    /* Free any buffers associated with the current PUBLISH message. */\r
+    if( pOperation->u.publish.pReceivedData != NULL )\r
+    {\r
+        IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Free the incoming PUBLISH operation. */\r
+    IotMqtt_FreeOperation( pOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,\r
+                           IotTaskPoolJob_t pSendJob,\r
+                           void * pContext )\r
+{\r
+    size_t bytesSent = 0;\r
+    bool destroyOperation = false, waitable = false, networkPending = false;\r
+    _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
+    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+    /* Check parameters. The task pool and job parameter is not used when asserts\r
+     * are disabled. */\r
+    ( void ) pTaskPool;\r
+    ( void ) pSendJob;\r
+    IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+    IotMqtt_Assert( pSendJob == pOperation->job );\r
+\r
+    /* The given operation must have an allocated packet and be waiting for a status. */\r
+    IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+    IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );\r
+    IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+\r
+    /* Check if this operation is waitable. */\r
+    waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
+\r
+    /* Check PUBLISH retry counts and limits. */\r
+    if( pOperation->u.operation.retry.limit > 0 )\r
+    {\r
+        if( _checkRetryLimit( pOperation ) == false )\r
+        {\r
+            pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Send an operation that is waiting for a response. */\r
+    if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                     pOperation );\r
+\r
+        /* Transmit the MQTT packet from the operation over the network. */\r
+        bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
+                                                              pOperation->u.operation.pMqttPacket,\r
+                                                              pOperation->u.operation.packetSize );\r
+\r
+        /* Check transmission status. */\r
+        if( bytesSent != pOperation->u.operation.packetSize )\r
+        {\r
+            pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;\r
+        }\r
+        else\r
+        {\r
+            /* DISCONNECT operations are considered successful upon successful\r
+             * transmission. In addition, non-waitable operations with no callback\r
+             * may also be considered successful. */\r
+            if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )\r
+            {\r
+                /* DISCONNECT operations are always waitable. */\r
+                IotMqtt_Assert( waitable == true );\r
+\r
+                pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
+            }\r
+            else if( waitable == false )\r
+            {\r
+                if( pOperation->u.operation.notify.callback.function == NULL )\r
+                {\r
+                    pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check if this operation requires further processing. */\r
+    if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
+    {\r
+        /* Check if this operation should be scheduled for retransmission. */\r
+        if( pOperation->u.operation.retry.limit > 0 )\r
+        {\r
+            if( _scheduleNextRetry( pOperation ) == false )\r
+            {\r
+                pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;\r
+            }\r
+            else\r
+            {\r
+                /* A successfully scheduled PUBLISH retry is awaiting a response\r
+                 * from the network. */\r
+                networkPending = true;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            /* Decrement reference count to signal completion of send job. Check\r
+             * if the operation should be destroyed. */\r
+            if( waitable == true )\r
+            {\r
+                destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+\r
+            /* If the operation should not be destroyed, transfer it from the\r
+             * pending processing to the pending response list. */\r
+            if( destroyOperation == false )\r
+            {\r
+                IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+                /* Operation must be linked. */\r
+                IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );\r
+\r
+                /* Transfer to pending response list. */\r
+                IotListDouble_Remove( &( pOperation->link ) );\r
+                IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
+                                          &( pOperation->link ) );\r
+\r
+                IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+                /* This operation is now awaiting a response from the network. */\r
+                networkPending = true;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Destroy the operation or notify of completion if necessary. */\r
+    if( destroyOperation == true )\r
+    {\r
+        _IotMqtt_DestroyOperation( pOperation );\r
+    }\r
+    else\r
+    {\r
+        /* Do not check the operation status if a network response is pending,\r
+         * since a network response could modify the status. */\r
+        if( networkPending == false )\r
+        {\r
+            /* Notify of operation completion if this job set a status. */\r
+            if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )\r
+            {\r
+                _IotMqtt_Notify( pOperation );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,\r
+                                         IotTaskPoolJob_t pOperationJob,\r
+                                         void * pContext )\r
+{\r
+    _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
+    IotMqttCallbackParam_t callbackParam = { 0 };\r
+\r
+    /* Check parameters. The task pool and job parameter is not used when asserts\r
+     * are disabled. */\r
+    ( void ) pTaskPool;\r
+    ( void ) pOperationJob;\r
+    IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+    IotMqtt_Assert( pOperationJob == pOperation->job );\r
+\r
+    /* The operation's callback function and status must be set. */\r
+    IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );\r
+    IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );\r
+\r
+    callbackParam.mqttConnection = pOperation->pMqttConnection;\r
+    callbackParam.u.operation.type = pOperation->u.operation.type;\r
+    callbackParam.u.operation.reference = pOperation;\r
+    callbackParam.u.operation.result = pOperation->u.operation.status;\r
+\r
+    /* Invoke the user callback function. */\r
+    pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,\r
+                                                      &callbackParam );\r
+\r
+    /* Attempt to destroy the operation once the user callback returns. */\r
+    if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
+    {\r
+        _IotMqtt_DestroyOperation( pOperation );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,\r
+                                           IotTaskPoolRoutine_t jobRoutine,\r
+                                           uint32_t delay )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+\r
+    /* Check that job routine is valid. */\r
+    IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||\r
+                    ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||\r
+                    ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );\r
+\r
+    /* Creating a new job should never fail when parameters are valid. */\r
+    taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,\r
+                                            pOperation,\r
+                                            &( pOperation->jobStorage ),\r
+                                            &( pOperation->job ) );\r
+    IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
+\r
+    /* Schedule the new job with a delay. */\r
+    taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
+                                                   pOperation->job,\r
+                                                   delay );\r
+\r
+    if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
+    {\r
+        /* Scheduling a newly-created job should never be invalid or illegal. */\r
+        IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );\r
+        IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );\r
+\r
+        IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",\r
+                    pOperation->pMqttConnection,\r
+                    IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                    pOperation,\r
+                    IotTaskPool_strerror( taskPoolStatus ) );\r
+\r
+        status = IOT_MQTT_SCHEDULING_ERROR;\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+_mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,\r
+                                           IotMqttOperationType_t type,\r
+                                           const uint16_t * pPacketIdentifier )\r
+{\r
+    bool waitable = false;\r
+    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+    _mqttOperation_t * pResult = NULL;\r
+    IotLink_t * pResultLink = NULL;\r
+    _operationMatchParam_t param = { .type = type, .pPacketIdentifier = pPacketIdentifier };\r
+\r
+    if( pPacketIdentifier != NULL )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "\r
+                     "with packet identifier %hu.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( type ),\r
+                     *pPacketIdentifier );\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( type ) );\r
+    }\r
+\r
+    /* Find and remove the first matching element in the list. */\r
+    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+    pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),\r
+                                                NULL,\r
+                                                _mqttOperation_match,\r
+                                                &param );\r
+\r
+    /* Check if a match was found. */\r
+    if( pResultLink != NULL )\r
+    {\r
+        /* Get operation pointer and check if it is waitable. */\r
+        pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );\r
+        waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
+\r
+        /* Check if the matched operation is a PUBLISH with retry. If it is, cancel\r
+         * the retry job. */\r
+        if( pResult->u.operation.retry.limit > 0 )\r
+        {\r
+            taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
+                                                    pResult->job,\r
+                                                    NULL );\r
+\r
+            /* If the retry job could not be canceled, then it is currently\r
+             * executing. Ignore the operation. */\r
+            if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
+            {\r
+                pResult = NULL;\r
+            }\r
+            else\r
+            {\r
+                /* Check job reference counts. A waitable operation should have a\r
+                 * count of 2; a non-waitable operation should have a count of 1. */\r
+                IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );\r
+            }\r
+        }\r
+        else\r
+        {\r
+            /* An operation with no retry in the pending responses list should\r
+             * always have a job reference of 1. */\r
+            IotMqtt_Assert( pResult->u.operation.jobReference == 1 );\r
+\r
+            /* Increment job references of a waitable operation to prevent Wait from\r
+             * destroying this operation if it times out. */\r
+            if( waitable == true )\r
+            {\r
+                ( pResult->u.operation.jobReference )++;\r
+\r
+                IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",\r
+                             pMqttConnection,\r
+                             IotMqtt_OperationType( type ),\r
+                             pResult,\r
+                             ( long int ) ( pResult->u.operation.jobReference - 1 ),\r
+                             ( long int ) ( pResult->u.operation.jobReference ) );\r
+            }\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    if( pResult != NULL )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Found operation %s." ,\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( type ) );\r
+\r
+        /* Remove the matched operation from the list. */\r
+        IotListDouble_Remove( &( pResult->link ) );\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Operation %s not found.",\r
+                     pMqttConnection,\r
+                     IotMqtt_OperationType( type ) );\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    return pResult;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_Notify( _mqttOperation_t * pOperation )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;\r
+    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+    /* Check if operation is waitable. */\r
+    bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
+\r
+    /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected\r
+     * subscriptions are removed by the deserializer, so not removed here. */\r
+    if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
+    {\r
+        switch( pOperation->u.operation.status )\r
+        {\r
+            case IOT_MQTT_SUCCESS:\r
+                break;\r
+\r
+            case IOT_MQTT_SERVER_REFUSED:\r
+                break;\r
+\r
+            default:\r
+                _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,\r
+                                                     pOperation->u.operation.packetIdentifier,\r
+                                                     -1 );\r
+                break;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Schedule callback invocation for non-waitable operation. */\r
+    if( waitable == false )\r
+    {\r
+        /* Non-waitable operation should have job reference of 1. */\r
+        IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );\r
+\r
+        /* Schedule an invocation of the callback. */\r
+        if( pOperation->u.operation.notify.callback.function != NULL )\r
+        {\r
+            IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+            status = _IotMqtt_ScheduleOperation( pOperation,\r
+                                                 _IotMqtt_ProcessCompletedOperation,\r
+                                                 0 );\r
+\r
+            if( status == IOT_MQTT_SUCCESS )\r
+            {\r
+                IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",\r
+                             pOperation->pMqttConnection,\r
+                             IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                             pOperation );\r
+\r
+                /* Place the scheduled operation back in the list of operations pending\r
+                 * processing. */\r
+                if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
+                {\r
+                    IotListDouble_Remove( &( pOperation->link ) );\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+\r
+                IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
+                                          &( pOperation->link ) );\r
+            }\r
+            else\r
+            {\r
+                IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",\r
+                            pOperation->pMqttConnection,\r
+                            IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                            pOperation );\r
+            }\r
+\r
+            IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Operations that weren't scheduled may be destroyed. */\r
+    if( status == IOT_MQTT_SCHEDULING_ERROR )\r
+    {\r
+        /* Decrement reference count of operations not scheduled. */\r
+        if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
+        {\r
+            _IotMqtt_DestroyOperation( pOperation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        /* Post to a waitable operation's semaphore. */\r
+        if( waitable == true )\r
+        {\r
+            IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "\r
+                         "notified of completion.",\r
+                         pOperation->pMqttConnection,\r
+                         IotMqtt_OperationType( pOperation->u.operation.type ),\r
+                         pOperation );\r
+\r
+            IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        IotMqtt_Assert( status == IOT_MQTT_SUCCESS );\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r