]> git.sur5r.net Git - freertos/blobdiff - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
Rename \FreeRTOS-Plus\Source\FreeRTOS-Plus-IoT-SDK to \FreeRTOS-Plus\Source\FreeRTOS...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_api.c
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
new file mode 100644 (file)
index 0000000..7df5326
--- /dev/null
@@ -0,0 +1,2018 @@
+/*\r
+ * Amazon FreeRTOS MQTT V2.0.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://aws.amazon.com/freertos\r
+ * http://www.FreeRTOS.org\r
+ */\r
+\r
+/**\r
+ * @file iot_mqtt_api.c\r
+ * @brief Implements most user-facing functions of the MQTT library.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <string.h>\r
+\r
+/* Error handling include. */\r
+#include "private/iot_error.h"\r
+\r
+/* MQTT internal include. */\r
+#include "private/iot_mqtt_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_clock.h"\r
+#include "platform/iot_threads.h"\r
+\r
+/* Validate MQTT configuration settings. */\r
+#if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1\r
+    #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."\r
+#endif\r
+#if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1\r
+    #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."\r
+#endif\r
+#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1\r
+    #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."\r
+#endif\r
+#if IOT_MQTT_RESPONSE_WAIT_MS <= 0\r
+    #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."\r
+#endif\r
+#if IOT_MQTT_RETRY_MS_CEILING <= 0\r
+    #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."\r
+#endif\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Set the unsubscribed flag of an MQTT subscription.\r
+ *\r
+ * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
+ * @param[in] pMatch Not used.\r
+ *\r
+ * @return Always returns `true`.\r
+ */\r
+static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
+                                              void * pMatch );\r
+\r
+/**\r
+ * @brief Destroy an MQTT subscription if its reference count is 0.\r
+ *\r
+ * @param[in] pData The subscription to destroy. This parameter is of type\r
+ * `void*` for compatibility with [free]\r
+ * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
+ */\r
+static void _mqttSubscription_tryDestroy( void * pData );\r
+\r
+/**\r
+ * @brief Decrement the reference count of an MQTT operation and attempt to\r
+ * destroy it.\r
+ *\r
+ * @param[in] pData The operation data to destroy. This parameter is of type\r
+ * `void*` for compatibility with [free]\r
+ * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
+ */\r
+static void _mqttOperation_tryDestroy( void * pData );\r
+\r
+/**\r
+ * @brief Create a keep-alive job for an MQTT connection.\r
+ *\r
+ * @param[in] pNetworkInfo User-provided network information for the new\r
+ * connection.\r
+ * @param[in] keepAliveSeconds User-provided keep-alive interval.\r
+ * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.\r
+ *\r
+ * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
+ */\r
+static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                 uint16_t keepAliveSeconds,\r
+                                 _mqttConnection_t * pMqttConnection );\r
+\r
+/**\r
+ * @brief Creates a new MQTT connection and initializes its members.\r
+ *\r
+ * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.\r
+ * @param[in] pNetworkInfo User-provided network information for the new\r
+ * connection.\r
+ * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.\r
+ *\r
+ * @return Pointer to a newly-created MQTT connection; `NULL` on failure.\r
+ */\r
+static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
+                                                  const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                                  uint16_t keepAliveSeconds );\r
+\r
+/**\r
+ * @brief Destroys the members of an MQTT connection.\r
+ *\r
+ * @param[in] pMqttConnection Which connection to destroy.\r
+ */\r
+static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );\r
+\r
+/**\r
+ * @brief The common component of both @ref mqtt_function_subscribe and @ref\r
+ * mqtt_function_unsubscribe.\r
+ *\r
+ * See @ref mqtt_function_subscribe or @ref mqtt_function_unsubscribe for a\r
+ * description of the parameters and return values.\r
+ */\r
+static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
+                                           IotMqttConnection_t mqttConnection,\r
+                                           const IotMqttSubscription_t * pSubscriptionList,\r
+                                           size_t subscriptionCount,\r
+                                           uint32_t flags,\r
+                                           const IotMqttCallbackInfo_t * pCallbackInfo,\r
+                                           IotMqttOperation_t * pOperationReference );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
+                                              void * pMatch )\r
+{\r
+    /* Because this function is called from a container function, the given link\r
+     * must never be NULL. */\r
+    IotMqtt_Assert( pSubscriptionLink != NULL );\r
+\r
+    _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
+                                                             pSubscriptionLink,\r
+                                                             link );\r
+\r
+    /* Silence warnings about unused parameters. */\r
+    ( void ) pMatch;\r
+\r
+    /* Set the unsubscribed flag. */\r
+    pSubscription->unsubscribed = true;\r
+\r
+    return true;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _mqttSubscription_tryDestroy( void * pData )\r
+{\r
+    _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;\r
+\r
+    /* Reference count must not be negative. */\r
+    IotMqtt_Assert( pSubscription->references >= 0 );\r
+\r
+    /* Unsubscribed flag should be set. */\r
+    IotMqtt_Assert( pSubscription->unsubscribed == true );\r
+\r
+    /* Free the subscription if it has no references. */\r
+    if( pSubscription->references == 0 )\r
+    {\r
+        IotMqtt_FreeSubscription( pSubscription );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _mqttOperation_tryDestroy( void * pData )\r
+{\r
+    _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;\r
+    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+\r
+    /* Incoming PUBLISH operations may always be freed. */\r
+    if( pOperation->incomingPublish == true )\r
+    {\r
+        /* Cancel the incoming PUBLISH operation's job. */\r
+        taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
+                                                pOperation->job,\r
+                                                NULL );\r
+\r
+        /* If the operation's job was not canceled, it must be already executing.\r
+         * Any other return value is invalid. */\r
+        IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
+                        ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
+\r
+        /* Check if the incoming PUBLISH job was canceled. */\r
+        if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+        {\r
+            /* Job was canceled. Process incoming PUBLISH now to clean up. */\r
+            _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,\r
+                                             pOperation->job,\r
+                                             pOperation );\r
+        }\r
+        else\r
+        {\r
+            /* The executing job will process the PUBLISH, so nothing is done here. */\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        /* Decrement reference count and destroy operation if possible. */\r
+        if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )\r
+        {\r
+            _IotMqtt_DestroyOperation( pOperation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                 uint16_t keepAliveSeconds,\r
+                                 _mqttConnection_t * pMqttConnection )\r
+{\r
+    bool status = true;\r
+    IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
+    IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;\r
+\r
+    /* Network information is not used when MQTT packet serializers are disabled. */\r
+    ( void ) pNetworkInfo;\r
+\r
+    /* Default PINGREQ serializer function. */\r
+    IotMqttError_t ( * serializePingreq )( uint8_t **,\r
+                                           size_t * ) = _IotMqtt_SerializePingreq;\r
+\r
+    /* Convert the keep-alive interval to milliseconds. */\r
+    pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
+    pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+\r
+    /* Choose a PINGREQ serializer function. */\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        if( pNetworkInfo->pMqttSerializer != NULL )\r
+        {\r
+            if( pNetworkInfo->pMqttSerializer->serialize.pingreq != NULL )\r
+            {\r
+                serializePingreq = pNetworkInfo->pMqttSerializer->serialize.pingreq;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* Generate a PINGREQ packet. */\r
+    serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
+                                        &( pMqttConnection->pingreqPacketSize ) );\r
+\r
+    if( serializeStatus != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "Failed to allocate PINGREQ packet for new connection." );\r
+\r
+        status = false;\r
+    }\r
+    else\r
+    {\r
+        /* Create the task pool job that processes keep-alive. */\r
+        jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
+                                           pMqttConnection,\r
+                                           &( pMqttConnection->keepAliveJobStorage ),\r
+                                           &( pMqttConnection->keepAliveJob ) );\r
+\r
+        /* Task pool job creation for a pre-allocated job should never fail.\r
+         * Abort the program if it does. */\r
+        if( jobStatus != IOT_TASKPOOL_SUCCESS )\r
+        {\r
+            IotLogError( "Failed to create keep-alive job for new connection." );\r
+\r
+            IotMqtt_Assert( false );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        /* Keep-alive references its MQTT connection, so increment reference. */\r
+        ( pMqttConnection->references )++;\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
+                                                  const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                                  uint16_t keepAliveSeconds )\r
+{\r
+    IOT_FUNCTION_ENTRY( bool, true );\r
+    _mqttConnection_t * pMqttConnection = NULL;\r
+    bool referencesMutexCreated = false, subscriptionMutexCreated = false;\r
+\r
+    /* Allocate memory for the new MQTT connection. */\r
+    pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );\r
+\r
+    if( pMqttConnection == NULL )\r
+    {\r
+        IotLogError( "Failed to allocate memory for new connection." );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( false );\r
+    }\r
+    else\r
+    {\r
+        /* Clear the MQTT connection, then copy the MQTT server mode, network\r
+         * interface, and disconnect callback. */\r
+        ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );\r
+        pMqttConnection->awsIotMqttMode = awsIotMqttMode;\r
+        pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;\r
+        pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;\r
+\r
+        /* Start a new MQTT connection with a reference count of 1. */\r
+        pMqttConnection->references = 1;\r
+    }\r
+\r
+    /* Create the references mutex for a new connection. It is a recursive mutex. */\r
+    referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );\r
+\r
+    if( referencesMutexCreated == false )\r
+    {\r
+        IotLogError( "Failed to create references mutex for new connection." );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( false );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Create the subscription mutex for a new connection. */\r
+    subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );\r
+\r
+    if( subscriptionMutexCreated == false )\r
+    {\r
+        IotLogError( "Failed to create subscription mutex for new connection." );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( false );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Create the new connection's subscription and operation lists. */\r
+    IotListDouble_Create( &( pMqttConnection->subscriptionList ) );\r
+    IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );\r
+    IotListDouble_Create( &( pMqttConnection->pendingResponse ) );\r
+\r
+    /* AWS IoT service limits set minimum and maximum values for keep-alive interval.\r
+     * Adjust the user-provided keep-alive interval based on these requirements. */\r
+    if( awsIotMqttMode == true )\r
+    {\r
+        if( keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )\r
+        {\r
+            keepAliveSeconds = AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE;\r
+        }\r
+        else if( keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )\r
+        {\r
+            keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
+        }\r
+        else if( keepAliveSeconds == 0 )\r
+        {\r
+            keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check if keep-alive is active for this connection. */\r
+    if( keepAliveSeconds != 0 )\r
+    {\r
+        if( _createKeepAliveJob( pNetworkInfo,\r
+                                 keepAliveSeconds,\r
+                                 pMqttConnection ) == false )\r
+        {\r
+            IOT_SET_AND_GOTO_CLEANUP( false );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Clean up mutexes and connection if this function failed. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    if( status == false )\r
+    {\r
+        if( subscriptionMutexCreated == true )\r
+        {\r
+            IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( referencesMutexCreated == true )\r
+        {\r
+            IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( pMqttConnection != NULL )\r
+        {\r
+            IotMqtt_FreeConnection( pMqttConnection );\r
+            pMqttConnection = NULL;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return pMqttConnection;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )\r
+{\r
+    IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
+\r
+    /* Clean up keep-alive if still allocated. */\r
+    if( pMqttConnection->keepAliveMs != 0 )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
+\r
+        _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+\r
+        /* Clear data about the keep-alive. */\r
+        pMqttConnection->keepAliveMs = 0;\r
+        pMqttConnection->pPingreqPacket = NULL;\r
+        pMqttConnection->pingreqPacketSize = 0;\r
+\r
+        /* Decrement reference count. */\r
+        pMqttConnection->references--;\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* A connection to be destroyed should have no keep-alive and at most 1\r
+     * reference. */\r
+    IotMqtt_Assert( pMqttConnection->references <= 1 );\r
+    IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
+    IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
+    IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
+\r
+    /* Remove all subscriptions. */\r
+    IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+    IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
+                                    _mqttSubscription_setUnsubscribe,\r
+                                    NULL,\r
+                                    _mqttSubscription_tryDestroy,\r
+                                    offsetof( _mqttSubscription_t, link ) );\r
+    IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    /* Destroy an owned network connection. */\r
+    if( pMqttConnection->ownNetworkConnection == true )\r
+    {\r
+        networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );\r
+\r
+        if( networkStatus != IOT_NETWORK_SUCCESS )\r
+        {\r
+            IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",\r
+                        pMqttConnection );\r
+        }\r
+        else\r
+        {\r
+            IotLogInfo( "(MQTT connection %p) Network connection destroyed.",\r
+                        pMqttConnection );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Destroy mutexes. */\r
+    IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
+    IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );\r
+\r
+    /* Free connection. */\r
+    IotMqtt_FreeConnection( pMqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
+                                           IotMqttConnection_t mqttConnection,\r
+                                           const IotMqttSubscription_t * pSubscriptionList,\r
+                                           size_t subscriptionCount,\r
+                                           uint32_t flags,\r
+                                           const IotMqttCallbackInfo_t * pCallbackInfo,\r
+                                           IotMqttOperation_t * pOperationReference )\r
+{\r
+    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+    _mqttOperation_t * pSubscriptionOperation = NULL;\r
+\r
+    /* Subscription serializer function. */\r
+    IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,\r
+                                                size_t,\r
+                                                uint8_t **,\r
+                                                size_t *,\r
+                                                uint16_t * ) = NULL;\r
+\r
+    /* This function should only be called for subscribe or unsubscribe. */\r
+    IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||\r
+                    ( operation == IOT_MQTT_UNSUBSCRIBE ) );\r
+\r
+    /* Check that all elements in the subscription list are valid. */\r
+    if( _IotMqtt_ValidateSubscriptionList( operation,\r
+                                           mqttConnection->awsIotMqttMode,\r
+                                           pSubscriptionList,\r
+                                           subscriptionCount ) == false )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check that a reference pointer is provided for a waitable operation. */\r
+    if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
+    {\r
+        if( pOperationReference == NULL )\r
+        {\r
+            IotLogError( "Reference must be provided for a waitable %s.",\r
+                         IotMqtt_OperationType( operation ) );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Choose a subscription serialize function. */\r
+    if( operation == IOT_MQTT_SUBSCRIBE )\r
+    {\r
+        serializeSubscription = _IotMqtt_SerializeSubscribe;\r
+\r
+        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+            if( mqttConnection->pSerializer != NULL )\r
+            {\r
+                if( mqttConnection->pSerializer->serialize.subscribe != NULL )\r
+                {\r
+                    serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+    }\r
+    else\r
+    {\r
+        serializeSubscription = _IotMqtt_SerializeUnsubscribe;\r
+\r
+        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+            if( mqttConnection->pSerializer != NULL )\r
+            {\r
+                if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )\r
+                {\r
+                    serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+    }\r
+\r
+    /* Remove the MQTT subscription list for an UNSUBSCRIBE. */\r
+    if( operation == IOT_MQTT_UNSUBSCRIBE )\r
+    {\r
+        _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,\r
+                                                  pSubscriptionList,\r
+                                                  subscriptionCount );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Create a subscription operation. */\r
+    status = _IotMqtt_CreateOperation( mqttConnection,\r
+                                       flags,\r
+                                       pCallbackInfo,\r
+                                       &pSubscriptionOperation );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Check the subscription operation data and set the operation type. */\r
+    IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+    IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
+    pSubscriptionOperation->u.operation.type = operation;\r
+\r
+    /* Generate a subscription packet from the subscription list. */\r
+    status = serializeSubscription( pSubscriptionList,\r
+                                    subscriptionCount,\r
+                                    &( pSubscriptionOperation->u.operation.pMqttPacket ),\r
+                                    &( pSubscriptionOperation->u.operation.packetSize ),\r
+                                    &( pSubscriptionOperation->u.operation.packetIdentifier ) );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Check the serialized MQTT packet. */\r
+    IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );\r
+    IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );\r
+\r
+    /* Add the subscription list for a SUBSCRIBE. */\r
+    if( operation == IOT_MQTT_SUBSCRIBE )\r
+    {\r
+        status = _IotMqtt_AddSubscriptions( mqttConnection,\r
+                                            pSubscriptionOperation->u.operation.packetIdentifier,\r
+                                            pSubscriptionList,\r
+                                            subscriptionCount );\r
+\r
+        if( status != IOT_MQTT_SUCCESS )\r
+        {\r
+            IOT_GOTO_CLEANUP();\r
+        }\r
+    }\r
+\r
+    /* Set the reference, if provided. */\r
+    if( pOperationReference != NULL )\r
+    {\r
+        *pOperationReference = pSubscriptionOperation;\r
+    }\r
+\r
+    /* Schedule the subscription operation for network transmission. */\r
+    status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,\r
+                                         _IotMqtt_ProcessSend,\r
+                                         0 );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",\r
+                     mqttConnection,\r
+                     IotMqtt_OperationType( operation ) );\r
+\r
+        if( operation == IOT_MQTT_SUBSCRIBE )\r
+        {\r
+            _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,\r
+                                                 pSubscriptionOperation->u.operation.packetIdentifier,\r
+                                                 -1 );\r
+        }\r
+\r
+        /* Clear the previously set (and now invalid) reference. */\r
+        if( pOperationReference != NULL )\r
+        {\r
+            *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;\r
+        }\r
+\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Clean up if this function failed. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        if( pSubscriptionOperation != NULL )\r
+        {\r
+            _IotMqtt_DestroyOperation( pSubscriptionOperation );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        status = IOT_MQTT_STATUS_PENDING;\r
+\r
+        IotLogInfo( "(MQTT connection %p) %s operation scheduled.",\r
+                    mqttConnection,\r
+                    IotMqtt_OperationType( operation ) );\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
+{\r
+    bool disconnected = false;\r
+\r
+    /* Lock the mutex protecting the reference count. */\r
+    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    /* Reference count must not be negative. */\r
+    IotMqtt_Assert( pMqttConnection->references >= 0 );\r
+\r
+    /* Read connection status. */\r
+    disconnected = pMqttConnection->disconnected;\r
+\r
+    /* Increment the connection's reference count if it is not disconnected. */\r
+    if( disconnected == false )\r
+    {\r
+        ( pMqttConnection->references )++;\r
+        IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
+                     pMqttConnection,\r
+                     ( long int ) pMqttConnection->references - 1,\r
+                     ( long int ) pMqttConnection->references );\r
+    }\r
+    else\r
+    {\r
+        IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    return( disconnected == false );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
+{\r
+    bool destroyConnection = false;\r
+\r
+    /* Lock the mutex protecting the reference count. */\r
+    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    /* Decrement reference count. It must not be negative. */\r
+    ( pMqttConnection->references )--;\r
+    IotMqtt_Assert( pMqttConnection->references >= 0 );\r
+\r
+    IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
+                 pMqttConnection,\r
+                 ( long int ) pMqttConnection->references + 1,\r
+                 ( long int ) pMqttConnection->references );\r
+\r
+    /* Check if this connection may be destroyed. */\r
+    if( pMqttConnection->references == 0 )\r
+    {\r
+        destroyConnection = true;\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+    /* Destroy an unreferenced MQTT connection. */\r
+    if( destroyConnection == true )\r
+    {\r
+        IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",\r
+                     pMqttConnection );\r
+        _destroyMqttConnection( pMqttConnection );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Init( void )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+\r
+    /* Call any additional serializer initialization function if serializer\r
+     * overrides are enabled. */\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        #ifdef _IotMqtt_InitSerializeAdditional\r
+            if( _IotMqtt_InitSerializeAdditional() == false )\r
+            {\r
+                status = IOT_MQTT_INIT_FAILED;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        #endif\r
+    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* Log initialization status. */\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "Failed to initialize MQTT library serializer. " );\r
+    }\r
+    else\r
+    {\r
+        IotLogInfo( "MQTT library successfully initialized." );\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void IotMqtt_Cleanup( void )\r
+{\r
+    /* Call any additional serializer cleanup initialization function if serializer\r
+     * overrides are enabled. */\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        #ifdef _IotMqtt_CleanupSerializeAdditional\r
+            _IotMqtt_CleanupSerializeAdditional();\r
+        #endif\r
+    #endif\r
+\r
+    IotLogInfo( "MQTT library cleanup done." );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                const IotMqttConnectInfo_t * pConnectInfo,\r
+                                uint32_t timeoutMs,\r
+                                IotMqttConnection_t * const pMqttConnection )\r
+{\r
+    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+    bool networkCreated = false, ownNetworkConnection = false;\r
+    IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
+    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+    void * pNetworkConnection = NULL;\r
+    _mqttOperation_t * pOperation = NULL;\r
+    _mqttConnection_t * pNewMqttConnection = NULL;\r
+\r
+    /* Default CONNECT serializer function. */\r
+    IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,\r
+                                           uint8_t **,\r
+                                           size_t * ) = _IotMqtt_SerializeConnect;\r
+\r
+    /* Network info must not be NULL. */\r
+    if( pNetworkInfo == NULL )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Validate network interface and connect info. */\r
+    if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* If will info is provided, check that it is valid. */\r
+    if( pConnectInfo->pWillInfo != NULL )\r
+    {\r
+        if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,\r
+                                      pConnectInfo->pWillInfo ) == false )\r
+        {\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+        }\r
+        else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )\r
+        {\r
+            /* Will message payloads cannot be larger than 65535. This restriction\r
+             * applies only to will messages, and not normal PUBLISH messages. */\r
+            IotLogError( "Will payload cannot be larger than 65535." );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* If previous subscriptions are provided, check that they are valid. */\r
+    if( pConnectInfo->cleanSession == false )\r
+    {\r
+        if( pConnectInfo->pPreviousSubscriptions != NULL )\r
+        {\r
+            if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,\r
+                                                   pConnectInfo->awsIotMqttMode,\r
+                                                   pConnectInfo->pPreviousSubscriptions,\r
+                                                   pConnectInfo->previousSubscriptionCount ) == false )\r
+            {\r
+                IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Create a new MQTT connection if requested. Otherwise, copy the existing\r
+     * network connection. */\r
+    if( pNetworkInfo->createNetworkConnection == true )\r
+    {\r
+        networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,\r
+                                                                 pNetworkInfo->u.setup.pNetworkCredentialInfo,\r
+                                                                 &pNetworkConnection );\r
+\r
+        if( networkStatus == IOT_NETWORK_SUCCESS )\r
+        {\r
+            networkCreated = true;\r
+\r
+            /* This MQTT connection owns the network connection it created and\r
+             * should destroy it on cleanup. */\r
+            ownNetworkConnection = true;\r
+        }\r
+        else\r
+        {\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        pNetworkConnection = pNetworkInfo->u.pNetworkConnection;\r
+        networkCreated = true;\r
+    }\r
+\r
+    IotLogInfo( "Establishing new MQTT connection." );\r
+\r
+    /* Initialize a new MQTT connection object. */\r
+    pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
+                                                pNetworkInfo,\r
+                                                pConnectInfo->keepAliveSeconds );\r
+\r
+    if( pNewMqttConnection == NULL )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
+    }\r
+    else\r
+    {\r
+        /* Set the network connection associated with the MQTT connection. */\r
+        pNewMqttConnection->pNetworkConnection = pNetworkConnection;\r
+        pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;\r
+\r
+        /* Set the MQTT packet serializer overrides. */\r
+        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+            pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;\r
+        #endif\r
+    }\r
+\r
+    /* Set the MQTT receive callback. */\r
+    networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,\r
+                                                                               IotMqtt_ReceiveCallback,\r
+                                                                               pNewMqttConnection );\r
+\r
+    if( networkStatus != IOT_NETWORK_SUCCESS )\r
+    {\r
+        IotLogError( "Failed to set MQTT network receive callback." );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Create a CONNECT operation. */\r
+    status = _IotMqtt_CreateOperation( pNewMqttConnection,\r
+                                       IOT_MQTT_FLAG_WAITABLE,\r
+                                       NULL,\r
+                                       &pOperation );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Ensure the members set by operation creation and serialization\r
+     * are appropriate for a blocking CONNECT. */\r
+    IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+    IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
+                    == IOT_MQTT_FLAG_WAITABLE );\r
+    IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+\r
+    /* Set the operation type. */\r
+    pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
+\r
+    /* Add previous session subscriptions. */\r
+    if( pConnectInfo->pPreviousSubscriptions != NULL )\r
+    {\r
+        /* Previous subscription count should have been validated as nonzero. */\r
+        IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );\r
+\r
+        status = _IotMqtt_AddSubscriptions( pNewMqttConnection,\r
+                                            2,\r
+                                            pConnectInfo->pPreviousSubscriptions,\r
+                                            pConnectInfo->previousSubscriptionCount );\r
+\r
+        if( status != IOT_MQTT_SUCCESS )\r
+        {\r
+            IOT_GOTO_CLEANUP();\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Choose a CONNECT serializer function. */\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        if( pNewMqttConnection->pSerializer != NULL )\r
+        {\r
+            if( pNewMqttConnection->pSerializer->serialize.connect != NULL )\r
+            {\r
+                serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* Convert the connect info and will info objects to an MQTT CONNECT packet. */\r
+    status = serializeConnect( pConnectInfo,\r
+                               &( pOperation->u.operation.pMqttPacket ),\r
+                               &( pOperation->u.operation.packetSize ) );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check the serialized MQTT packet. */\r
+    IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+    IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
+\r
+    /* Add the CONNECT operation to the send queue for network transmission. */\r
+    status = _IotMqtt_ScheduleOperation( pOperation,\r
+                                         _IotMqtt_ProcessSend,\r
+                                         0 );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "Failed to enqueue CONNECT for sending." );\r
+    }\r
+    else\r
+    {\r
+        /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */\r
+        status = IotMqtt_Wait( pOperation,\r
+                               timeoutMs );\r
+\r
+        /* The call to wait cleans up the CONNECT operation, so set the pointer\r
+         * to NULL. */\r
+        pOperation = NULL;\r
+    }\r
+\r
+    /* When a connection is successfully established, schedule keep-alive job. */\r
+    if( status == IOT_MQTT_SUCCESS )\r
+    {\r
+        /* Check if a keep-alive job should be scheduled. */\r
+        if( pNewMqttConnection->keepAliveMs != 0 )\r
+        {\r
+            IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
+\r
+            taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
+                                                           pNewMqttConnection->keepAliveJob,\r
+                                                           pNewMqttConnection->nextKeepAliveMs );\r
+\r
+            if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
+            {\r
+                IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "Failed to establish new MQTT connection, error %s.",\r
+                     IotMqtt_strerror( status ) );\r
+\r
+        /* The network connection must be closed if it was created. */\r
+        if( networkCreated == true )\r
+        {\r
+            networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );\r
+\r
+            if( networkStatus != IOT_NETWORK_SUCCESS )\r
+            {\r
+                IotLogWarn( "Failed to close network connection." );\r
+            }\r
+            else\r
+            {\r
+                IotLogInfo( "Network connection closed on error." );\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( pOperation != NULL )\r
+        {\r
+            _IotMqtt_DestroyOperation( pOperation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( pNewMqttConnection != NULL )\r
+        {\r
+            _destroyMqttConnection( pNewMqttConnection );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        IotLogInfo( "New MQTT connection %p established.", pMqttConnection );\r
+\r
+        /* Set the output parameter. */\r
+        *pMqttConnection = pNewMqttConnection;\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,\r
+                         uint32_t flags )\r
+{\r
+    bool disconnected = false;\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    _mqttOperation_t * pOperation = NULL;\r
+\r
+    IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );\r
+\r
+    /* Read the connection status. */\r
+    IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
+    disconnected = mqttConnection->disconnected;\r
+    IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
+\r
+    /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"\r
+     * flag is not set. */\r
+    if( disconnected == false )\r
+    {\r
+        if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == 0 )\r
+        {\r
+            /* Create a DISCONNECT operation. This function blocks until the DISCONNECT\r
+             * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */\r
+            status = _IotMqtt_CreateOperation( mqttConnection,\r
+                                               IOT_MQTT_FLAG_WAITABLE,\r
+                                               NULL,\r
+                                               &pOperation );\r
+\r
+            if( status == IOT_MQTT_SUCCESS )\r
+            {\r
+                /* Ensure that the members set by operation creation and serialization\r
+                 * are appropriate for a blocking DISCONNECT. */\r
+                IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+                IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
+                                == IOT_MQTT_FLAG_WAITABLE );\r
+                IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+\r
+                /* Set the operation type. */\r
+                pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
+\r
+                /* Choose a disconnect serializer. */\r
+                IotMqttError_t ( * serializeDisconnect )( uint8_t **,\r
+                                                          size_t * ) = _IotMqtt_SerializeDisconnect;\r
+\r
+                #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+                    if( mqttConnection->pSerializer != NULL )\r
+                    {\r
+                        if( mqttConnection->pSerializer->serialize.disconnect != NULL )\r
+                        {\r
+                            serializeDisconnect = mqttConnection->pSerializer->serialize.disconnect;\r
+                        }\r
+                        else\r
+                        {\r
+                            EMPTY_ELSE_MARKER;\r
+                        }\r
+                    }\r
+                    else\r
+                    {\r
+                        EMPTY_ELSE_MARKER;\r
+                    }\r
+                #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+                /* Generate a DISCONNECT packet. */\r
+                status = serializeDisconnect( &( pOperation->u.operation.pMqttPacket ),\r
+                                              &( pOperation->u.operation.packetSize ) );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+\r
+            if( status == IOT_MQTT_SUCCESS )\r
+            {\r
+                /* Check the serialized MQTT packet. */\r
+                IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+                IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
+\r
+                /* Schedule the DISCONNECT operation for network transmission. */\r
+                if( _IotMqtt_ScheduleOperation( pOperation,\r
+                                                _IotMqtt_ProcessSend,\r
+                                                0 ) != IOT_MQTT_SUCCESS )\r
+                {\r
+                    IotLogWarn( "(MQTT connection %p) Failed to schedule DISCONNECT for sending.",\r
+                                mqttConnection );\r
+                    _IotMqtt_DestroyOperation( pOperation );\r
+                }\r
+                else\r
+                {\r
+                    /* Wait a short time for the DISCONNECT packet to be transmitted. */\r
+                    status = IotMqtt_Wait( pOperation,\r
+                                           IOT_MQTT_RESPONSE_WAIT_MS );\r
+\r
+                    /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,\r
+                     * or NETWORK ERROR. */\r
+                    if( status == IOT_MQTT_SUCCESS )\r
+                    {\r
+                        IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );\r
+                    }\r
+                    else\r
+                    {\r
+                        IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||\r
+                                        ( status == IOT_MQTT_NETWORK_ERROR ) );\r
+\r
+                        IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",\r
+                                    mqttConnection,\r
+                                    IotMqtt_strerror( status ) );\r
+                    }\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Close the underlying network connection. This also cleans up keep-alive. */\r
+    _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,\r
+                                     mqttConnection );\r
+\r
+    /* Check if the connection may be destroyed. */\r
+    IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
+\r
+    /* At this point, the connection should be marked disconnected. */\r
+    IotMqtt_Assert( mqttConnection->disconnected == true );\r
+\r
+    /* Attempt cancel and destroy each operation in the connection's lists. */\r
+    IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),\r
+                             _mqttOperation_tryDestroy,\r
+                             offsetof( _mqttOperation_t, link ) );\r
+\r
+    IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),\r
+                             _mqttOperation_tryDestroy,\r
+                             offsetof( _mqttOperation_t, link ) );\r
+\r
+    IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
+\r
+    /* Decrement the connection reference count and destroy it if possible. */\r
+    _IotMqtt_DecrementConnectionReferences( mqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,\r
+                                  const IotMqttSubscription_t * pSubscriptionList,\r
+                                  size_t subscriptionCount,\r
+                                  uint32_t flags,\r
+                                  const IotMqttCallbackInfo_t * pCallbackInfo,\r
+                                  IotMqttOperation_t * pSubscribeOperation )\r
+{\r
+    return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
+                                mqttConnection,\r
+                                pSubscriptionList,\r
+                                subscriptionCount,\r
+                                flags,\r
+                                pCallbackInfo,\r
+                                pSubscribeOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_TimedSubscribe( IotMqttConnection_t mqttConnection,\r
+                                       const IotMqttSubscription_t * pSubscriptionList,\r
+                                       size_t subscriptionCount,\r
+                                       uint32_t flags,\r
+                                       uint32_t timeoutMs )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
+\r
+    /* Flags are not used, but the parameter is present for future compatibility. */\r
+    ( void ) flags;\r
+\r
+    /* Call the asynchronous SUBSCRIBE function. */\r
+    status = IotMqtt_Subscribe( mqttConnection,\r
+                                pSubscriptionList,\r
+                                subscriptionCount,\r
+                                IOT_MQTT_FLAG_WAITABLE,\r
+                                NULL,\r
+                                &subscribeOperation );\r
+\r
+    /* Wait for the SUBSCRIBE operation to complete. */\r
+    if( status == IOT_MQTT_STATUS_PENDING )\r
+    {\r
+        status = IotMqtt_Wait( subscribeOperation, timeoutMs );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Ensure that a status was set. */\r
+    IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,\r
+                                    const IotMqttSubscription_t * pSubscriptionList,\r
+                                    size_t subscriptionCount,\r
+                                    uint32_t flags,\r
+                                    const IotMqttCallbackInfo_t * pCallbackInfo,\r
+                                    IotMqttOperation_t * pUnsubscribeOperation )\r
+{\r
+    return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
+                                mqttConnection,\r
+                                pSubscriptionList,\r
+                                subscriptionCount,\r
+                                flags,\r
+                                pCallbackInfo,\r
+                                pUnsubscribeOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_TimedUnsubscribe( IotMqttConnection_t mqttConnection,\r
+                                         const IotMqttSubscription_t * pSubscriptionList,\r
+                                         size_t subscriptionCount,\r
+                                         uint32_t flags,\r
+                                         uint32_t timeoutMs )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
+\r
+    /* Flags are not used, but the parameter is present for future compatibility. */\r
+    ( void ) flags;\r
+\r
+    /* Call the asynchronous UNSUBSCRIBE function. */\r
+    status = IotMqtt_Unsubscribe( mqttConnection,\r
+                                  pSubscriptionList,\r
+                                  subscriptionCount,\r
+                                  IOT_MQTT_FLAG_WAITABLE,\r
+                                  NULL,\r
+                                  &unsubscribeOperation );\r
+\r
+    /* Wait for the UNSUBSCRIBE operation to complete. */\r
+    if( status == IOT_MQTT_STATUS_PENDING )\r
+    {\r
+        status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Ensure that a status was set. */\r
+    IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,\r
+                                const IotMqttPublishInfo_t * pPublishInfo,\r
+                                uint32_t flags,\r
+                                const IotMqttCallbackInfo_t * pCallbackInfo,\r
+                                IotMqttOperation_t * pPublishOperation )\r
+{\r
+    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+    _mqttOperation_t * pOperation = NULL;\r
+    uint8_t ** pPacketIdentifierHigh = NULL;\r
+\r
+    /* Default PUBLISH serializer function. */\r
+    IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,\r
+                                           uint8_t **,\r
+                                           size_t *,\r
+                                           uint16_t *,\r
+                                           uint8_t ** ) = _IotMqtt_SerializePublish;\r
+\r
+    /* Check that the PUBLISH information is valid. */\r
+    if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,\r
+                                  pPublishInfo ) == false )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check that no notification is requested for a QoS 0 publish. */\r
+    if( pPublishInfo->qos == IOT_MQTT_QOS_0 )\r
+    {\r
+        if( pCallbackInfo != NULL )\r
+        {\r
+            IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+        }\r
+        else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )\r
+        {\r
+            IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        if( pPublishOperation != NULL )\r
+        {\r
+            IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check that a reference pointer is provided for a waitable operation. */\r
+    if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
+    {\r
+        if( pPublishOperation == NULL )\r
+        {\r
+            IotLogError( "Reference must be provided for a waitable PUBLISH." );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Create a PUBLISH operation. */\r
+    status = _IotMqtt_CreateOperation( mqttConnection,\r
+                                       flags,\r
+                                       pCallbackInfo,\r
+                                       &pOperation );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check the PUBLISH operation data and set the operation type. */\r
+    IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+    pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;\r
+\r
+    /* Choose a PUBLISH serializer function. */\r
+    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+        if( mqttConnection->pSerializer != NULL )\r
+        {\r
+            if( mqttConnection->pSerializer->serialize.publish != NULL )\r
+            {\r
+                serializePublish = mqttConnection->pSerializer->serialize.publish;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */\r
+    if( mqttConnection->awsIotMqttMode == true )\r
+    {\r
+        pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Generate a PUBLISH packet from pPublishInfo. */\r
+    status = serializePublish( pPublishInfo,\r
+                               &( pOperation->u.operation.pMqttPacket ),\r
+                               &( pOperation->u.operation.packetSize ),\r
+                               &( pOperation->u.operation.packetIdentifier ),\r
+                               pPacketIdentifierHigh );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check the serialized MQTT packet. */\r
+    IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+    IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
+\r
+    /* Initialize PUBLISH retry if retryLimit is set. */\r
+    if( pPublishInfo->retryLimit > 0 )\r
+    {\r
+        /* A QoS 0 PUBLISH may not be retried. */\r
+        if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
+        {\r
+            pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
+            pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Set the reference, if provided. */\r
+    if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
+    {\r
+        if( pPublishOperation != NULL )\r
+        {\r
+            *pPublishOperation = pOperation;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Add the PUBLISH operation to the send queue for network transmission. */\r
+    status = _IotMqtt_ScheduleOperation( pOperation,\r
+                                         _IotMqtt_ProcessSend,\r
+                                         0 );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",\r
+                     mqttConnection );\r
+\r
+        /* Clear the previously set (and now invalid) reference. */\r
+        if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
+        {\r
+            if( pPublishOperation != NULL )\r
+            {\r
+                *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Clean up the PUBLISH operation if this function fails. Otherwise, set the\r
+     * appropriate return code based on QoS. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        if( pOperation != NULL )\r
+        {\r
+            _IotMqtt_DestroyOperation( pOperation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        if( pPublishInfo->qos > IOT_MQTT_QOS_0 )\r
+        {\r
+            status = IOT_MQTT_STATUS_PENDING;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",\r
+                    mqttConnection );\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_TimedPublish( IotMqttConnection_t mqttConnection,\r
+                                     const IotMqttPublishInfo_t * pPublishInfo,\r
+                                     uint32_t flags,\r
+                                     uint32_t timeoutMs )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,\r
+                       * pPublishOperation = NULL;\r
+\r
+    /* Clear the flags. */\r
+    flags = 0;\r
+\r
+    /* Set the waitable flag and reference for QoS 1 PUBLISH. */\r
+    if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
+    {\r
+        flags = IOT_MQTT_FLAG_WAITABLE;\r
+        pPublishOperation = &publishOperation;\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Call the asynchronous PUBLISH function. */\r
+    status = IotMqtt_Publish( mqttConnection,\r
+                              pPublishInfo,\r
+                              flags,\r
+                              NULL,\r
+                              pPublishOperation );\r
+\r
+    /* Wait for a queued QoS 1 PUBLISH to complete. */\r
+    if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
+    {\r
+        if( status == IOT_MQTT_STATUS_PENDING )\r
+        {\r
+            status = IotMqtt_Wait( publishOperation, timeoutMs );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,\r
+                             uint32_t timeoutMs )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+    _mqttConnection_t * pMqttConnection = operation->pMqttConnection;\r
+\r
+    /* Validate the given operation reference. */\r
+    if( _IotMqtt_ValidateOperation( operation ) == false )\r
+    {\r
+        status = IOT_MQTT_BAD_PARAMETER;\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Check the MQTT connection status. */\r
+    if( status == IOT_MQTT_SUCCESS )\r
+    {\r
+        IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+        if( pMqttConnection->disconnected == true )\r
+        {\r
+            IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "\r
+                         "Operation cannot be waited on.",\r
+                         pMqttConnection,\r
+                         IotMqtt_OperationType( operation->u.operation.type ),\r
+                         operation );\r
+\r
+            status = IOT_MQTT_NETWORK_ERROR;\r
+        }\r
+        else\r
+        {\r
+            IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",\r
+                        pMqttConnection,\r
+                        IotMqtt_OperationType( operation->u.operation.type ),\r
+                        operation );\r
+        }\r
+\r
+        IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+        /* Only wait on an operation if the MQTT connection is active. */\r
+        if( status == IOT_MQTT_SUCCESS )\r
+        {\r
+            if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),\r
+                                        timeoutMs ) == false )\r
+            {\r
+                status = IOT_MQTT_TIMEOUT;\r
+\r
+                /* Attempt to cancel the job of the timed out operation. */\r
+                ( void ) _IotMqtt_DecrementOperationReferences( operation, true );\r
+\r
+                /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */\r
+                if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
+                {\r
+                    IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"\r
+                                 " subscriptions of timed-out SUBSCRIBE.",\r
+                                 pMqttConnection,\r
+                                 operation );\r
+\r
+                    _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,\r
+                                                         operation->u.operation.packetIdentifier,\r
+                                                         -1 );\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                /* Retrieve the status of the completed operation. */\r
+                status = operation->u.operation.status;\r
+            }\r
+\r
+            IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",\r
+                        pMqttConnection,\r
+                        IotMqtt_OperationType( operation->u.operation.type ),\r
+                        operation,\r
+                        IotMqtt_strerror( status ) );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        /* Wait is finished; decrement operation reference count. */\r
+        if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )\r
+        {\r
+            _IotMqtt_DestroyOperation( operation );\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+const char * IotMqtt_strerror( IotMqttError_t status )\r
+{\r
+    const char * pMessage = NULL;\r
+\r
+    switch( status )\r
+    {\r
+        case IOT_MQTT_SUCCESS:\r
+            pMessage = "SUCCESS";\r
+            break;\r
+\r
+        case IOT_MQTT_STATUS_PENDING:\r
+            pMessage = "PENDING";\r
+            break;\r
+\r
+        case IOT_MQTT_INIT_FAILED:\r
+            pMessage = "INITIALIZATION FAILED";\r
+            break;\r
+\r
+        case IOT_MQTT_BAD_PARAMETER:\r
+            pMessage = "BAD PARAMETER";\r
+            break;\r
+\r
+        case IOT_MQTT_NO_MEMORY:\r
+            pMessage = "NO MEMORY";\r
+            break;\r
+\r
+        case IOT_MQTT_NETWORK_ERROR:\r
+            pMessage = "NETWORK ERROR";\r
+            break;\r
+\r
+        case IOT_MQTT_SCHEDULING_ERROR:\r
+            pMessage = "SCHEDULING ERROR";\r
+            break;\r
+\r
+        case IOT_MQTT_BAD_RESPONSE:\r
+            pMessage = "BAD RESPONSE RECEIVED";\r
+            break;\r
+\r
+        case IOT_MQTT_TIMEOUT:\r
+            pMessage = "TIMEOUT";\r
+            break;\r
+\r
+        case IOT_MQTT_SERVER_REFUSED:\r
+            pMessage = "SERVER REFUSED";\r
+            break;\r
+\r
+        case IOT_MQTT_RETRY_NO_RESPONSE:\r
+            pMessage = "NO RESPONSE";\r
+            break;\r
+\r
+        default:\r
+            pMessage = "INVALID STATUS";\r
+            break;\r
+    }\r
+\r
+    return pMessage;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+const char * IotMqtt_OperationType( IotMqttOperationType_t operation )\r
+{\r
+    const char * pMessage = NULL;\r
+\r
+    switch( operation )\r
+    {\r
+        case IOT_MQTT_CONNECT:\r
+            pMessage = "CONNECT";\r
+            break;\r
+\r
+        case IOT_MQTT_PUBLISH_TO_SERVER:\r
+            pMessage = "PUBLISH";\r
+            break;\r
+\r
+        case IOT_MQTT_PUBACK:\r
+            pMessage = "PUBACK";\r
+            break;\r
+\r
+        case IOT_MQTT_SUBSCRIBE:\r
+            pMessage = "SUBSCRIBE";\r
+            break;\r
+\r
+        case IOT_MQTT_UNSUBSCRIBE:\r
+            pMessage = "UNSUBSCRIBE";\r
+            break;\r
+\r
+        case IOT_MQTT_PINGREQ:\r
+            pMessage = "PINGREQ";\r
+            break;\r
+\r
+        case IOT_MQTT_DISCONNECT:\r
+            pMessage = "DISCONNECT";\r
+            break;\r
+\r
+        default:\r
+            pMessage = "INVALID OPERATION";\r
+            break;\r
+    }\r
+\r
+    return pMessage;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/* Provide access to internal functions and variables if testing. */\r
+#if IOT_BUILD_TESTS == 1\r
+    #include "iot_test_access_mqtt_api.c"\r
+#endif\r