]> git.sur5r.net Git - freertos/blobdiff - FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/aws/shadow/src/aws_iot_shadow_operation.c
Add the Labs projects provided in the V10.2.1_191129 zip file.
[freertos] / FreeRTOS-Labs / Source / FreeRTOS-IoT-Libraries / c_sdk / aws / shadow / src / aws_iot_shadow_operation.c
diff --git a/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/aws/shadow/src/aws_iot_shadow_operation.c b/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/aws/shadow/src/aws_iot_shadow_operation.c
new file mode 100644 (file)
index 0000000..27f7f91
--- /dev/null
@@ -0,0 +1,925 @@
+/*\r
+ * AWS IoT Shadow V2.1.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ */\r
+\r
+/**\r
+ * @file aws_iot_shadow_operation.c\r
+ * @brief Implements functions that process Shadow operations.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <string.h>\r
+\r
+/* Shadow internal include. */\r
+#include "private/aws_iot_shadow_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_threads.h"\r
+\r
+/* MQTT include. */\r
+#include "iot_mqtt.h"\r
+\r
+/* Error handling include. */\r
+#include "iot_error.h"\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief First parameter to #_shadowOperation_match.\r
+ */\r
+typedef struct _operationMatchParams\r
+{\r
+    _shadowOperationType_t type; /**< @brief DELETE, GET, or UPDATE. */\r
+    const char * pThingName;     /**< @brief Thing Name of Shadow operation. */\r
+    size_t thingNameLength;      /**< @brief Length of #_operationMatchParams_t.pThingName. */\r
+    const char * pDocument;      /**< @brief Shadow UPDATE response document. */\r
+    size_t documentLength;       /**< @brief Length of #_operationMatchParams_t.pDocument. */\r
+} _operationMatchParams_t;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Match a received Shadow response with a Shadow operation awaiting a\r
+ * response.\r
+ *\r
+ * @param[in] pOperationLink Pointer to the link member of the #_shadowOperation_t\r
+ * to check.\r
+ * @param[in] pMatch Pointer to an #_operationMatchParams_t.\r
+ *\r
+ * @return `true` if `pMatch` matches the received response; `false` otherwise.\r
+ */\r
+static bool _shadowOperation_match( const IotLink_t * pOperationLink,\r
+                                    void * pMatch );\r
+\r
+/**\r
+ * @brief Common function for processing received Shadow responses.\r
+ *\r
+ * @param[in] type DELETE, GET, or UPDATE.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _commonOperationCallback( _shadowOperationType_t type,\r
+                                      IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Invoked when a Shadow response is received for Shadow DELETE.\r
+ *\r
+ * @param[in] pArgument Ignored.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _deleteCallback( void * pArgument,\r
+                             IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Invoked when a Shadow response is received for a Shadow GET.\r
+ *\r
+ * @param[in] pArgument Ignored.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _getCallback( void * pArgument,\r
+                          IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Process an incoming Shadow document received when a Shadow GET is\r
+ * accepted.\r
+ *\r
+ * @param[in] pOperation The GET operation associated with the incoming Shadow\r
+ * document.\r
+ * @param[in] pPublishInfo The received Shadow document (as an MQTT PUBLISH\r
+ * message).\r
+ *\r
+ * @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY. Memory allocation\r
+ * only happens for a waitable `pOperation`.\r
+ */\r
+static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation,\r
+                                                const IotMqttPublishInfo_t * pPublishInfo );\r
+\r
+/**\r
+ * @brief Invoked when a Shadow response is received for a Shadow UPDATE.\r
+ *\r
+ * @param[in] pArgument Ignored.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _updateCallback( void * pArgument,\r
+                             IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Notify of a completed Shadow operation.\r
+ *\r
+ * @param[in] pOperation The operation which completed.\r
+ *\r
+ * Depending on the parameters passed to a user-facing Shadow function, the\r
+ * notification will cause @ref shadow_function_wait to return or invoke a\r
+ * user-provided callback.\r
+ */\r
+static void _notifyCompletion( _shadowOperation_t * pOperation );\r
+\r
+/**\r
+ * @brief Get a Shadow subscription to use with a Shadow operation.\r
+ *\r
+ * This function may use an existing Shadow subscription, or it may allocate a\r
+ * new one.\r
+ *\r
+ * @param[in] pThingName Thing Name associated with operation.\r
+ * @param[in] thingNameLength Length of `pThingName`.\r
+ * @param[in] pTopicBuffer Contains the topic to use for subscribing.\r
+ * @param[in] operationTopicLength The length of the base topic in `pTopicBuffer`.\r
+ * @param[in] pOperation Shadow operation that needs a subscription.\r
+ * @param[out] pFreeTopicBuffer Whether the caller may free `pTopicBuffer`\r
+ * (which may be assigned to a subscription).\r
+ *\r
+ * @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY\r
+ */\r
+static AwsIotShadowError_t _findSubscription( const char * pThingName,\r
+                                              size_t thingNameLength,\r
+                                              char * pTopicBuffer,\r
+                                              uint16_t operationTopicLength,\r
+                                              _shadowOperation_t * pOperation,\r
+                                              bool * pFreeTopicBuffer );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+#if LIBRARY_LOG_LEVEL > IOT_LOG_NONE\r
+\r
+/**\r
+ * @brief Printable names for each of the Shadow operations.\r
+ */\r
+    const char * const _pAwsIotShadowOperationNames[] =\r
+    {\r
+        "DELETE",\r
+        "GET",\r
+        "UPDATE",\r
+        "SET DELTA",\r
+        "SET UPDATED"\r
+    };\r
+#endif /* if LIBRARY_LOG_LEVEL > IOT_LOG_NONE */\r
+\r
+/**\r
+ * @brief List of active Shadow operations awaiting a response from the Shadow\r
+ * service.\r
+ */\r
+IotListDouble_t _AwsIotShadowPendingOperations = { 0 };\r
+\r
+/**\r
+ * @brief Protects #_AwsIotShadowPendingOperations from concurrent access.\r
+ */\r
+IotMutex_t _AwsIotShadowPendingOperationsMutex;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _shadowOperation_match( const IotLink_t * pOperationLink,\r
+                                    void * pMatch )\r
+{\r
+    /* Because this function is called from a container function, the given link\r
+     * must never be NULL. */\r
+    AwsIotShadow_Assert( pOperationLink != NULL );\r
+\r
+    _shadowOperation_t * pOperation = IotLink_Container( _shadowOperation_t,\r
+                                                         pOperationLink,\r
+                                                         link );\r
+    _operationMatchParams_t * pParam = ( _operationMatchParams_t * ) pMatch;\r
+    _shadowSubscription_t * pSubscription = pOperation->pSubscription;\r
+    const char * pClientToken = NULL;\r
+    size_t clientTokenLength = 0;\r
+\r
+    /* Check for matching Thing Name and operation type. */\r
+    bool match = ( pOperation->type == pParam->type ) &&\r
+                 ( pParam->thingNameLength == pSubscription->thingNameLength ) &&\r
+                 ( strncmp( pParam->pThingName,\r
+                            pSubscription->pThingName,\r
+                            pParam->thingNameLength ) == 0 );\r
+\r
+    /* For a Shadow UPDATE operation, compare the client tokens. */\r
+    if( ( match == true ) && ( pOperation->type == SHADOW_UPDATE ) )\r
+    {\r
+        /* Check document pointers. */\r
+        AwsIotShadow_Assert( pParam->pDocument != NULL );\r
+        AwsIotShadow_Assert( pParam->documentLength > 0 );\r
+        AwsIotShadow_Assert( pOperation->u.update.pClientToken != NULL );\r
+        AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 );\r
+\r
+        IotLogDebug( "Verifying client tokens for Shadow UPDATE." );\r
+\r
+        /* Check for the client token in the UPDATE response document. */\r
+        match = AwsIot_GetClientToken( pParam->pDocument,\r
+                                       pParam->documentLength,\r
+                                       &pClientToken,\r
+                                       &clientTokenLength );\r
+\r
+        /* If the UPDATE response document has a client token, check that it\r
+         * matches. */\r
+        if( match == true )\r
+        {\r
+            match = ( clientTokenLength == pOperation->u.update.clientTokenLength ) &&\r
+                    ( strncmp( pClientToken,\r
+                               pOperation->u.update.pClientToken,\r
+                               clientTokenLength ) == 0 );\r
+        }\r
+        else\r
+        {\r
+            IotLogWarn( "Received a Shadow UPDATE response with no client token. "\r
+                        "This is possibly a response to a bad JSON document:\r\n%.*s",\r
+                        pParam->documentLength,\r
+                        pParam->pDocument );\r
+        }\r
+    }\r
+\r
+    return match;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _commonOperationCallback( _shadowOperationType_t type,\r
+                                      IotMqttCallbackParam_t * pMessage )\r
+{\r
+    _shadowOperation_t * pOperation = NULL;\r
+    IotLink_t * pOperationLink = NULL;\r
+    AwsIotStatus_t status = AWS_IOT_UNKNOWN;\r
+    _operationMatchParams_t param = { .type = ( _shadowOperationType_t ) 0 };\r
+    uint32_t flags = 0;\r
+\r
+    /* Set operation type to search. */\r
+    param.type = type;\r
+\r
+    /* Set the response document for a Shadow UPDATE. */\r
+    if( type == SHADOW_UPDATE )\r
+    {\r
+        param.pDocument = pMessage->u.message.info.pPayload;\r
+        param.documentLength = pMessage->u.message.info.payloadLength;\r
+    }\r
+\r
+    /* Parse the Thing Name from the MQTT topic name. */\r
+    if( AwsIot_ParseThingName( pMessage->u.message.info.pTopicName,\r
+                               pMessage->u.message.info.topicNameLength,\r
+                               &( param.pThingName ),\r
+                               &( param.thingNameLength ) ) == false )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Lock the pending operations list for exclusive access. */\r
+    IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+\r
+    /* Search for a matching pending operation. */\r
+    pOperationLink = IotListDouble_FindFirstMatch( &( _AwsIotShadowPendingOperations ),\r
+                                                   NULL,\r
+                                                   _shadowOperation_match,\r
+                                                   &param );\r
+\r
+    /* Find and remove the first Shadow operation of the given type. */\r
+    if( pOperationLink == NULL )\r
+    {\r
+        /* Operation is not pending. It may have already been processed. Return\r
+         * without doing anything */\r
+        IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+\r
+        IotLogWarn( "Shadow %s callback received an unknown operation.",\r
+                    _pAwsIotShadowOperationNames[ type ] );\r
+\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        pOperation = IotLink_Container( _shadowOperation_t, pOperationLink, link );\r
+\r
+        /* Remove a non-waitable operation from the pending operation list.\r
+         * Waitable operations are removed by the Wait function. */\r
+        if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 )\r
+        {\r
+            IotListDouble_Remove( &( pOperation->link ) );\r
+            IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+        }\r
+    }\r
+\r
+    /* Check that the Shadow operation type and status. */\r
+    AwsIotShadow_Assert( pOperation->type == type );\r
+    AwsIotShadow_Assert( pOperation->status == AWS_IOT_SHADOW_STATUS_PENDING );\r
+\r
+    IotLogDebug( "Received Shadow response on topic %.*s",\r
+                 pMessage->u.message.info.topicNameLength,\r
+                 pMessage->u.message.info.pTopicName );\r
+\r
+    /* Parse the status from the topic name. */\r
+    status = AwsIot_ParseStatus( pMessage->u.message.info.pTopicName,\r
+                                 pMessage->u.message.info.topicNameLength );\r
+\r
+    switch( status )\r
+    {\r
+        case AWS_IOT_ACCEPTED:\r
+            IotLogInfo( "Shadow %s of %.*s was ACCEPTED.",\r
+                        _pAwsIotShadowOperationNames[ type ],\r
+                        pOperation->pSubscription->thingNameLength,\r
+                        pOperation->pSubscription->pThingName );\r
+\r
+            /* Process the retrieved document for a Shadow GET. Otherwise, set\r
+             * status to success. */\r
+            if( type == SHADOW_GET )\r
+            {\r
+                pOperation->status = _processAcceptedGet( pOperation,\r
+                                                          &( pMessage->u.message.info ) );\r
+            }\r
+            else\r
+            {\r
+                pOperation->status = AWS_IOT_SHADOW_SUCCESS;\r
+            }\r
+\r
+            break;\r
+\r
+        case AWS_IOT_REJECTED:\r
+            IotLogWarn( "Shadow %s of %.*s was REJECTED.",\r
+                        _pAwsIotShadowOperationNames[ type ],\r
+                        pOperation->pSubscription->thingNameLength,\r
+                        pOperation->pSubscription->pThingName );\r
+\r
+            pOperation->status = _AwsIotShadow_ParseErrorDocument( pMessage->u.message.info.pPayload,\r
+                                                                   pMessage->u.message.info.payloadLength );\r
+            break;\r
+\r
+        default:\r
+            IotLogWarn( "Unknown status for %s of %.*s Shadow. Ignoring message.",\r
+                        _pAwsIotShadowOperationNames[ type ],\r
+                        pOperation->pSubscription->thingNameLength,\r
+                        pOperation->pSubscription->pThingName );\r
+\r
+            pOperation->status = AWS_IOT_SHADOW_BAD_RESPONSE;\r
+            break;\r
+    }\r
+\r
+    /* Copy the flags from the Shadow operation. The notify function may delete the operation. */\r
+    flags = pOperation->flags;\r
+\r
+    /* Notify of operation completion. */\r
+    _notifyCompletion( pOperation );\r
+\r
+    /* For waitable operations, unlock the pending operation list mutex to allow\r
+     * the Wait function to run. */\r
+    if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+    {\r
+        IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+    }\r
+\r
+    /* This function has no return value and no cleanup, but uses the cleanup\r
+     * label to exit on error. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _deleteCallback( void * pArgument,\r
+                             IotMqttCallbackParam_t * pMessage )\r
+{\r
+    /* Silence warnings about unused parameter. */\r
+    ( void ) pArgument;\r
+\r
+    _commonOperationCallback( SHADOW_DELETE, pMessage );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _getCallback( void * pArgument,\r
+                          IotMqttCallbackParam_t * pMessage )\r
+{\r
+    /* Silence warnings about unused parameter. */\r
+    ( void ) pArgument;\r
+\r
+    _commonOperationCallback( SHADOW_GET, pMessage );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation,\r
+                                                const IotMqttPublishInfo_t * pPublishInfo )\r
+{\r
+    AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;\r
+\r
+    /* A non-waitable operation can re-use the pointers from the publish info,\r
+     * since those are guaranteed to be in-scope throughout the user callback.\r
+     * But a waitable operation must copy the data from the publish info because\r
+     * AwsIotShadow_Wait may be called after the MQTT library frees the publish\r
+     * info. */\r
+    if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 )\r
+    {\r
+        pOperation->u.get.pDocument = pPublishInfo->pPayload;\r
+        pOperation->u.get.documentLength = pPublishInfo->payloadLength;\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "Allocating new buffer for waitable Shadow GET." );\r
+\r
+        /* Parameter validation should not have allowed a NULL malloc function. */\r
+        AwsIotShadow_Assert( pOperation->u.get.mallocDocument != NULL );\r
+\r
+        /* Allocate a buffer for the retrieved document. */\r
+        pOperation->u.get.pDocument = pOperation->u.get.mallocDocument( pPublishInfo->payloadLength );\r
+\r
+        if( pOperation->u.get.pDocument == NULL )\r
+        {\r
+            IotLogError( "Failed to allocate buffer for retrieved Shadow document." );\r
+\r
+            status = AWS_IOT_SHADOW_NO_MEMORY;\r
+        }\r
+        else\r
+        {\r
+            /* Copy the retrieved document. */\r
+            ( void ) memcpy( ( void * ) pOperation->u.get.pDocument,\r
+                             pPublishInfo->pPayload,\r
+                             pPublishInfo->payloadLength );\r
+            pOperation->u.get.documentLength = pPublishInfo->payloadLength;\r
+        }\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _updateCallback( void * pArgument,\r
+                             IotMqttCallbackParam_t * pMessage )\r
+{\r
+    /* Silence warnings about unused parameter. */\r
+    ( void ) pArgument;\r
+\r
+    _commonOperationCallback( SHADOW_UPDATE, pMessage );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _notifyCompletion( _shadowOperation_t * pOperation )\r
+{\r
+    AwsIotShadowCallbackParam_t callbackParam = { .callbackType = ( AwsIotShadowCallbackType_t ) 0 };\r
+    _shadowSubscription_t * pSubscription = pOperation->pSubscription,\r
+                          * pRemovedSubscription = NULL;\r
+\r
+    /* If the operation is waiting, post to its wait semaphore and return. */\r
+    if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+    {\r
+        IotSemaphore_Post( &( pOperation->notify.waitSemaphore ) );\r
+    }\r
+    else\r
+    {\r
+        /* Decrement the reference count. This also removes subscriptions if the\r
+         * count reaches 0. */\r
+        IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );\r
+        _AwsIotShadow_DecrementReferences( pOperation,\r
+                                           pSubscription->pTopicBuffer,\r
+                                           &pRemovedSubscription );\r
+        IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );\r
+\r
+        /* Set the subscription pointer used for the user callback based on whether\r
+         * a subscription was removed from the list. */\r
+        if( pRemovedSubscription != NULL )\r
+        {\r
+            pSubscription = pRemovedSubscription;\r
+        }\r
+\r
+        AwsIotShadow_Assert( pSubscription != NULL );\r
+\r
+        /* Invoke the user callback if provided. */\r
+        if( pOperation->notify.callback.function != NULL )\r
+        {\r
+            /* Set the common members of the callback parameter. */\r
+            callbackParam.callbackType = ( AwsIotShadowCallbackType_t ) pOperation->type;\r
+            callbackParam.mqttConnection = pOperation->mqttConnection;\r
+            callbackParam.u.operation.result = pOperation->status;\r
+            callbackParam.u.operation.reference = pOperation;\r
+            callbackParam.pThingName = pSubscription->pThingName;\r
+            callbackParam.thingNameLength = pSubscription->thingNameLength;\r
+\r
+            /* Set the members of the callback parameter for a received document. */\r
+            if( pOperation->type == SHADOW_GET )\r
+            {\r
+                callbackParam.u.operation.get.pDocument = pOperation->u.get.pDocument;\r
+                callbackParam.u.operation.get.documentLength = pOperation->u.get.documentLength;\r
+            }\r
+\r
+            pOperation->notify.callback.function( pOperation->notify.callback.pCallbackContext,\r
+                                                  &callbackParam );\r
+        }\r
+\r
+        /* Destroy a removed subscription. */\r
+        if( pRemovedSubscription != NULL )\r
+        {\r
+            _AwsIotShadow_DestroySubscription( pRemovedSubscription );\r
+        }\r
+\r
+        _AwsIotShadow_DestroyOperation( pOperation );\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static AwsIotShadowError_t _findSubscription( const char * pThingName,\r
+                                              size_t thingNameLength,\r
+                                              char * pTopicBuffer,\r
+                                              uint16_t operationTopicLength,\r
+                                              _shadowOperation_t * pOperation,\r
+                                              bool * pFreeTopicBuffer )\r
+{\r
+    AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;\r
+    _shadowSubscription_t * pSubscription = NULL;\r
+\r
+    /* Lookup table for Shadow operation callbacks. */\r
+    const AwsIotMqttCallbackFunction_t shadowCallbacks[ SHADOW_OPERATION_COUNT ] =\r
+    {\r
+        _deleteCallback,\r
+        _getCallback,\r
+        _updateCallback\r
+    };\r
+\r
+    /* Lock the subscriptions mutex for exclusive access. */\r
+    IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );\r
+\r
+    /* Check for an existing subscription. This function will attempt to allocate\r
+     * a new subscription if not found. */\r
+    pSubscription = _AwsIotShadow_FindSubscription( pThingName,\r
+                                                    thingNameLength,\r
+                                                    true );\r
+\r
+    if( pSubscription == NULL )\r
+    {\r
+        status = AWS_IOT_SHADOW_NO_MEMORY;\r
+    }\r
+    else\r
+    {\r
+        /* Ensure that the subscription Thing Name matches. */\r
+        AwsIotShadow_Assert( pSubscription != NULL );\r
+        AwsIotShadow_Assert( pSubscription->thingNameLength == thingNameLength );\r
+        AwsIotShadow_Assert( strncmp( pSubscription->pThingName,\r
+                                      pThingName,\r
+                                      thingNameLength ) == 0 );\r
+\r
+        /* Set the subscription object for the Shadow operation. */\r
+        pOperation->pSubscription = pSubscription;\r
+\r
+        /* Assign the topic buffer to the subscription to use for unsubscribing if\r
+         * the subscription has no topic buffer. */\r
+        if( pSubscription->pTopicBuffer == NULL )\r
+        {\r
+            pSubscription->pTopicBuffer = pTopicBuffer;\r
+\r
+            /* Don't free the topic buffer if it was allocated to the subscription. */\r
+            *pFreeTopicBuffer = false;\r
+        }\r
+        else\r
+        {\r
+            *pFreeTopicBuffer = true;\r
+        }\r
+\r
+        /* Increment the reference count for this Shadow operation's\r
+         * subscriptions. */\r
+        status = _AwsIotShadow_IncrementReferences( pOperation,\r
+                                                    pTopicBuffer,\r
+                                                    operationTopicLength,\r
+                                                    shadowCallbacks[ pOperation->type ] );\r
+\r
+        if( status != AWS_IOT_SHADOW_SUCCESS )\r
+        {\r
+            /* Failed to add subscriptions for a Shadow operation. The reference\r
+             * count was not incremented. Check if this subscription should be\r
+             * deleted. */\r
+            _AwsIotShadow_RemoveSubscription( pSubscription, NULL );\r
+        }\r
+    }\r
+\r
+    /* Unlock the Shadow subscription list mutex. */\r
+    IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+AwsIotShadowError_t _AwsIotShadow_CreateOperation( _shadowOperation_t ** pNewOperation,\r
+                                                   _shadowOperationType_t type,\r
+                                                   uint32_t flags,\r
+                                                   const AwsIotShadowCallbackInfo_t * pCallbackInfo )\r
+{\r
+    IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_SUCCESS );\r
+    _shadowOperation_t * pOperation = NULL;\r
+\r
+    IotLogDebug( "Creating operation record for Shadow %s.",\r
+                 _pAwsIotShadowOperationNames[ type ] );\r
+\r
+    /* Allocate memory for a new Shadow operation. */\r
+    pOperation = AwsIotShadow_MallocOperation( sizeof( _shadowOperation_t ) );\r
+\r
+    if( pOperation == NULL )\r
+    {\r
+        IotLogError( "Failed to allocate memory for Shadow %s.",\r
+                     _pAwsIotShadowOperationNames[ type ] );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );\r
+    }\r
+\r
+    /* Clear the operation data. */\r
+    ( void ) memset( pOperation, 0x00, sizeof( _shadowOperation_t ) );\r
+\r
+    /* Check if the waitable flag is set. If it is, create a semaphore to\r
+     * wait on. */\r
+    if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+    {\r
+        if( IotSemaphore_Create( &( pOperation->notify.waitSemaphore ), 0, 1 ) == false )\r
+        {\r
+            IotLogError( "Failed to create semaphore for waitable Shadow %s.",\r
+                         _pAwsIotShadowOperationNames[ type ] );\r
+\r
+            IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        /* If the waitable flag isn't set but a callback is, copy the callback\r
+         * information. */\r
+        if( pCallbackInfo != NULL )\r
+        {\r
+            pOperation->notify.callback = *pCallbackInfo;\r
+        }\r
+    }\r
+\r
+    /* Set the remaining common members of the Shadow operation. */\r
+    pOperation->type = type;\r
+    pOperation->flags = flags;\r
+    pOperation->status = AWS_IOT_SHADOW_STATUS_PENDING;\r
+\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    if( status != AWS_IOT_SHADOW_SUCCESS )\r
+    {\r
+        if( pOperation != NULL )\r
+        {\r
+            AwsIotShadow_FreeOperation( pOperation );\r
+        }\r
+    }\r
+    else\r
+    {\r
+        /* Set the output parameter. */\r
+        *pNewOperation = pOperation;\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _AwsIotShadow_DestroyOperation( void * pData )\r
+{\r
+    _shadowOperation_t * pOperation = ( _shadowOperation_t * ) pData;\r
+\r
+    /* The Shadow operation pointer must not be NULL. */\r
+    AwsIotShadow_Assert( pOperation != NULL );\r
+\r
+    IotLogDebug( "Destroying Shadow operation %s.",\r
+                 _pAwsIotShadowOperationNames[ pOperation->type ] );\r
+\r
+    /* Check if a wait semaphore was created for this operation. */\r
+    if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+    {\r
+        /* Destroy the wait semaphore */\r
+        IotSemaphore_Destroy( &( pOperation->notify.waitSemaphore ) );\r
+    }\r
+\r
+    /* If this is a Shadow update, free any allocated client token. */\r
+    if( ( pOperation->type == SHADOW_UPDATE ) &&\r
+        ( pOperation->u.update.pClientToken != NULL ) )\r
+    {\r
+        AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 );\r
+\r
+        AwsIotShadow_FreeString( ( void * ) ( pOperation->u.update.pClientToken ) );\r
+    }\r
+\r
+    /* Free the memory used to hold operation data. */\r
+    AwsIotShadow_FreeOperation( pOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+AwsIotShadowError_t _AwsIotShadow_GenerateShadowTopic( _shadowOperationType_t type,\r
+                                                       const char * pThingName,\r
+                                                       size_t thingNameLength,\r
+                                                       char ** pTopicBuffer,\r
+                                                       uint16_t * pOperationTopicLength )\r
+{\r
+    AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;\r
+    AwsIotTopicInfo_t topicInfo = { 0 };\r
+\r
+    /* Lookup table for Shadow operation strings. */\r
+    const char * const pOperationString[ SHADOW_OPERATION_COUNT ] =\r
+    {\r
+        SHADOW_DELETE_OPERATION_STRING, /* Shadow delete operation. */\r
+        SHADOW_GET_OPERATION_STRING,    /* Shadow get operation. */\r
+        SHADOW_UPDATE_OPERATION_STRING  /* Shadow update operation. */\r
+    };\r
+\r
+    /* Lookup table for Shadow operation string lengths. */\r
+    const uint16_t pOperationStringLength[ SHADOW_OPERATION_COUNT ] =\r
+    {\r
+        SHADOW_DELETE_OPERATION_STRING_LENGTH, /* Shadow delete operation. */\r
+        SHADOW_GET_OPERATION_STRING_LENGTH,    /* Shadow get operation. */\r
+        SHADOW_UPDATE_OPERATION_STRING_LENGTH  /* Shadow update operation. */\r
+    };\r
+\r
+    /* Only Shadow delete, get, and update operation types should be passed to this\r
+     * function. */\r
+    AwsIotShadow_Assert( ( type == SHADOW_DELETE ) ||\r
+                         ( type == SHADOW_GET ) ||\r
+                         ( type == SHADOW_UPDATE ) );\r
+\r
+    /* Set the members needed to generate an operation topic. */\r
+    topicInfo.pThingName = pThingName;\r
+    topicInfo.thingNameLength = thingNameLength;\r
+    topicInfo.pOperationName = pOperationString[ type ];\r
+    topicInfo.operationNameLength = pOperationStringLength[ type ];\r
+    topicInfo.longestSuffixLength = SHADOW_LONGEST_SUFFIX_LENGTH;\r
+    topicInfo.mallocString = AwsIotShadow_MallocString;\r
+\r
+    if( AwsIot_GenerateOperationTopic( &topicInfo,\r
+                                       pTopicBuffer,\r
+                                       pOperationTopicLength ) == false )\r
+    {\r
+        status = AWS_IOT_SHADOW_NO_MEMORY;\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+AwsIotShadowError_t _AwsIotShadow_ProcessOperation( IotMqttConnection_t mqttConnection,\r
+                                                    const char * pThingName,\r
+                                                    size_t thingNameLength,\r
+                                                    _shadowOperation_t * pOperation,\r
+                                                    const AwsIotShadowDocumentInfo_t * pDocumentInfo )\r
+{\r
+    IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_STATUS_PENDING );\r
+    IotMqttError_t publishStatus = IOT_MQTT_STATUS_PENDING;\r
+    char * pTopicBuffer = NULL;\r
+    uint16_t operationTopicLength = 0;\r
+    bool freeTopicBuffer = true;\r
+    IotMqttPublishInfo_t publishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;\r
+\r
+    IotLogDebug( "Processing Shadow operation %s for Thing %.*s.",\r
+                 _pAwsIotShadowOperationNames[ pOperation->type ],\r
+                 thingNameLength,\r
+                 pThingName );\r
+\r
+    /* Set the operation's MQTT connection. */\r
+    pOperation->mqttConnection = mqttConnection;\r
+\r
+    /* Generate the operation topic buffer. */\r
+    status = _AwsIotShadow_GenerateShadowTopic( pOperation->type,\r
+                                                pThingName,\r
+                                                thingNameLength,\r
+                                                &pTopicBuffer,\r
+                                                &operationTopicLength );\r
+\r
+    if( status != AWS_IOT_SHADOW_SUCCESS )\r
+    {\r
+        IotLogError( "No memory for Shadow operation topic buffer." );\r
+\r
+        IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );\r
+    }\r
+\r
+    /* Get a subscription object for this Shadow operation. */\r
+    status = _findSubscription( pThingName,\r
+                                thingNameLength,\r
+                                pTopicBuffer,\r
+                                operationTopicLength,\r
+                                pOperation,\r
+                                &freeTopicBuffer );\r
+\r
+    if( status != AWS_IOT_SHADOW_SUCCESS )\r
+    {\r
+        /* No subscription was found and no subscription could be allocated. */\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Set the operation topic name. */\r
+    publishInfo.pTopicName = pTopicBuffer;\r
+    publishInfo.topicNameLength = operationTopicLength;\r
+\r
+    IotLogDebug( "Shadow %s message will be published to topic %.*s",\r
+                 _pAwsIotShadowOperationNames[ pOperation->type ],\r
+                 publishInfo.topicNameLength,\r
+                 publishInfo.pTopicName );\r
+\r
+    /* Set the document info if this operation is not a Shadow DELETE. */\r
+    if( pOperation->type != SHADOW_DELETE )\r
+    {\r
+        publishInfo.qos = pDocumentInfo->qos;\r
+        publishInfo.retryLimit = pDocumentInfo->retryLimit;\r
+        publishInfo.retryMs = pDocumentInfo->retryMs;\r
+\r
+        IotLogDebug( "Shadow %s message will be published at QoS %d with "\r
+                     "retryLimit %d and retryMs %llu.",\r
+                     _pAwsIotShadowOperationNames[ pOperation->type ],\r
+                     publishInfo.qos,\r
+                     publishInfo.retryLimit,\r
+                     publishInfo.retryMs );\r
+    }\r
+\r
+    /* Set the PUBLISH payload to the update document for Shadow UPDATE. */\r
+    if( pOperation->type == SHADOW_UPDATE )\r
+    {\r
+        publishInfo.pPayload = pDocumentInfo->u.update.pUpdateDocument;\r
+        publishInfo.payloadLength = pDocumentInfo->u.update.updateDocumentLength;\r
+    }\r
+\r
+    /* Set the PUBLISH payload to an empty string for Shadow DELETE and GET,\r
+     * per the Shadow spec. */\r
+    else\r
+    {\r
+        publishInfo.pPayload = "";\r
+        publishInfo.payloadLength = 0;\r
+    }\r
+\r
+    /* Add Shadow operation to the pending operations list. */\r
+    IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+    IotListDouble_InsertHead( &( _AwsIotShadowPendingOperations ),\r
+                              &( pOperation->link ) );\r
+    IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+\r
+    /* Publish to the Shadow topic name. */\r
+    publishStatus = IotMqtt_PublishSync( pOperation->mqttConnection,\r
+                                         &publishInfo,\r
+                                         0,\r
+                                         _AwsIotShadowMqttTimeoutMs );\r
+\r
+    /* Check for errors from the MQTT publish. */\r
+    if( publishStatus != IOT_MQTT_SUCCESS )\r
+    {\r
+        IotLogError( "Failed to publish MQTT message to %s %.*s Shadow, error %s.",\r
+                     _pAwsIotShadowOperationNames[ pOperation->type ],\r
+                     thingNameLength,\r
+                     pThingName,\r
+                     IotMqtt_strerror( publishStatus ) );\r
+\r
+        /* Convert the MQTT "NO MEMORY" error to a Shadow "NO MEMORY" error. */\r
+        status = SHADOW_CONVERT_STATUS_CODE_MQTT_TO_SHADOW( publishStatus );\r
+\r
+        /* If the "keep subscriptions" flag is not set, decrement the reference\r
+         * count. */\r
+        if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_KEEP_SUBSCRIPTIONS ) == 0 )\r
+        {\r
+            IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );\r
+            _AwsIotShadow_DecrementReferences( pOperation,\r
+                                               pTopicBuffer,\r
+                                               NULL );\r
+            IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );\r
+        }\r
+\r
+        /* Remove Shadow operation from the pending operations list. */\r
+        IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+        IotListDouble_Remove( &( pOperation->link ) );\r
+        IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+    }\r
+    else\r
+    {\r
+        IotLogDebug( "Shadow %s PUBLISH message successfully sent.",\r
+                     _pAwsIotShadowOperationNames[ pOperation->type ] );\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+    /* Free the topic buffer used by this function if it was not assigned to a\r
+     * subscription. */\r
+    if( ( freeTopicBuffer == true ) && ( pTopicBuffer != NULL ) )\r
+    {\r
+        AwsIotShadow_FreeString( pTopicBuffer );\r
+    }\r
+\r
+    /* Destroy the Shadow operation on failure. */\r
+    if( status != AWS_IOT_SHADOW_SUCCESS )\r
+    {\r
+        _AwsIotShadow_DestroyOperation( pOperation );\r
+    }\r
+    else\r
+    {\r
+        /* Convert successful return code to "status pending", as the Shadow\r
+         * library is now waiting for a response from the service. */\r
+        status = AWS_IOT_SHADOW_STATUS_PENDING;\r
+    }\r
+\r
+    IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r