--- /dev/null
+/*\r
+ * AWS IoT Shadow V2.1.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ */\r
+\r
+/**\r
+ * @file aws_iot_shadow_operation.c\r
+ * @brief Implements functions that process Shadow operations.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <string.h>\r
+\r
+/* Shadow internal include. */\r
+#include "private/aws_iot_shadow_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_threads.h"\r
+\r
+/* MQTT include. */\r
+#include "iot_mqtt.h"\r
+\r
+/* Error handling include. */\r
+#include "iot_error.h"\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief First parameter to #_shadowOperation_match.\r
+ */\r
+typedef struct _operationMatchParams\r
+{\r
+ _shadowOperationType_t type; /**< @brief DELETE, GET, or UPDATE. */\r
+ const char * pThingName; /**< @brief Thing Name of Shadow operation. */\r
+ size_t thingNameLength; /**< @brief Length of #_operationMatchParams_t.pThingName. */\r
+ const char * pDocument; /**< @brief Shadow UPDATE response document. */\r
+ size_t documentLength; /**< @brief Length of #_operationMatchParams_t.pDocument. */\r
+} _operationMatchParams_t;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Match a received Shadow response with a Shadow operation awaiting a\r
+ * response.\r
+ *\r
+ * @param[in] pOperationLink Pointer to the link member of the #_shadowOperation_t\r
+ * to check.\r
+ * @param[in] pMatch Pointer to an #_operationMatchParams_t.\r
+ *\r
+ * @return `true` if `pMatch` matches the received response; `false` otherwise.\r
+ */\r
+static bool _shadowOperation_match( const IotLink_t * pOperationLink,\r
+ void * pMatch );\r
+\r
+/**\r
+ * @brief Common function for processing received Shadow responses.\r
+ *\r
+ * @param[in] type DELETE, GET, or UPDATE.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _commonOperationCallback( _shadowOperationType_t type,\r
+ IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Invoked when a Shadow response is received for Shadow DELETE.\r
+ *\r
+ * @param[in] pArgument Ignored.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _deleteCallback( void * pArgument,\r
+ IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Invoked when a Shadow response is received for a Shadow GET.\r
+ *\r
+ * @param[in] pArgument Ignored.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _getCallback( void * pArgument,\r
+ IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Process an incoming Shadow document received when a Shadow GET is\r
+ * accepted.\r
+ *\r
+ * @param[in] pOperation The GET operation associated with the incoming Shadow\r
+ * document.\r
+ * @param[in] pPublishInfo The received Shadow document (as an MQTT PUBLISH\r
+ * message).\r
+ *\r
+ * @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY. Memory allocation\r
+ * only happens for a waitable `pOperation`.\r
+ */\r
+static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation,\r
+ const IotMqttPublishInfo_t * pPublishInfo );\r
+\r
+/**\r
+ * @brief Invoked when a Shadow response is received for a Shadow UPDATE.\r
+ *\r
+ * @param[in] pArgument Ignored.\r
+ * @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).\r
+ */\r
+static void _updateCallback( void * pArgument,\r
+ IotMqttCallbackParam_t * pMessage );\r
+\r
+/**\r
+ * @brief Notify of a completed Shadow operation.\r
+ *\r
+ * @param[in] pOperation The operation which completed.\r
+ *\r
+ * Depending on the parameters passed to a user-facing Shadow function, the\r
+ * notification will cause @ref shadow_function_wait to return or invoke a\r
+ * user-provided callback.\r
+ */\r
+static void _notifyCompletion( _shadowOperation_t * pOperation );\r
+\r
+/**\r
+ * @brief Get a Shadow subscription to use with a Shadow operation.\r
+ *\r
+ * This function may use an existing Shadow subscription, or it may allocate a\r
+ * new one.\r
+ *\r
+ * @param[in] pThingName Thing Name associated with operation.\r
+ * @param[in] thingNameLength Length of `pThingName`.\r
+ * @param[in] pTopicBuffer Contains the topic to use for subscribing.\r
+ * @param[in] operationTopicLength The length of the base topic in `pTopicBuffer`.\r
+ * @param[in] pOperation Shadow operation that needs a subscription.\r
+ * @param[out] pFreeTopicBuffer Whether the caller may free `pTopicBuffer`\r
+ * (which may be assigned to a subscription).\r
+ *\r
+ * @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY\r
+ */\r
+static AwsIotShadowError_t _findSubscription( const char * pThingName,\r
+ size_t thingNameLength,\r
+ char * pTopicBuffer,\r
+ uint16_t operationTopicLength,\r
+ _shadowOperation_t * pOperation,\r
+ bool * pFreeTopicBuffer );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+#if LIBRARY_LOG_LEVEL > IOT_LOG_NONE\r
+\r
+/**\r
+ * @brief Printable names for each of the Shadow operations.\r
+ */\r
+ const char * const _pAwsIotShadowOperationNames[] =\r
+ {\r
+ "DELETE",\r
+ "GET",\r
+ "UPDATE",\r
+ "SET DELTA",\r
+ "SET UPDATED"\r
+ };\r
+#endif /* if LIBRARY_LOG_LEVEL > IOT_LOG_NONE */\r
+\r
+/**\r
+ * @brief List of active Shadow operations awaiting a response from the Shadow\r
+ * service.\r
+ */\r
+IotListDouble_t _AwsIotShadowPendingOperations = { 0 };\r
+\r
+/**\r
+ * @brief Protects #_AwsIotShadowPendingOperations from concurrent access.\r
+ */\r
+IotMutex_t _AwsIotShadowPendingOperationsMutex;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _shadowOperation_match( const IotLink_t * pOperationLink,\r
+ void * pMatch )\r
+{\r
+ /* Because this function is called from a container function, the given link\r
+ * must never be NULL. */\r
+ AwsIotShadow_Assert( pOperationLink != NULL );\r
+\r
+ _shadowOperation_t * pOperation = IotLink_Container( _shadowOperation_t,\r
+ pOperationLink,\r
+ link );\r
+ _operationMatchParams_t * pParam = ( _operationMatchParams_t * ) pMatch;\r
+ _shadowSubscription_t * pSubscription = pOperation->pSubscription;\r
+ const char * pClientToken = NULL;\r
+ size_t clientTokenLength = 0;\r
+\r
+ /* Check for matching Thing Name and operation type. */\r
+ bool match = ( pOperation->type == pParam->type ) &&\r
+ ( pParam->thingNameLength == pSubscription->thingNameLength ) &&\r
+ ( strncmp( pParam->pThingName,\r
+ pSubscription->pThingName,\r
+ pParam->thingNameLength ) == 0 );\r
+\r
+ /* For a Shadow UPDATE operation, compare the client tokens. */\r
+ if( ( match == true ) && ( pOperation->type == SHADOW_UPDATE ) )\r
+ {\r
+ /* Check document pointers. */\r
+ AwsIotShadow_Assert( pParam->pDocument != NULL );\r
+ AwsIotShadow_Assert( pParam->documentLength > 0 );\r
+ AwsIotShadow_Assert( pOperation->u.update.pClientToken != NULL );\r
+ AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 );\r
+\r
+ IotLogDebug( "Verifying client tokens for Shadow UPDATE." );\r
+\r
+ /* Check for the client token in the UPDATE response document. */\r
+ match = AwsIot_GetClientToken( pParam->pDocument,\r
+ pParam->documentLength,\r
+ &pClientToken,\r
+ &clientTokenLength );\r
+\r
+ /* If the UPDATE response document has a client token, check that it\r
+ * matches. */\r
+ if( match == true )\r
+ {\r
+ match = ( clientTokenLength == pOperation->u.update.clientTokenLength ) &&\r
+ ( strncmp( pClientToken,\r
+ pOperation->u.update.pClientToken,\r
+ clientTokenLength ) == 0 );\r
+ }\r
+ else\r
+ {\r
+ IotLogWarn( "Received a Shadow UPDATE response with no client token. "\r
+ "This is possibly a response to a bad JSON document:\r\n%.*s",\r
+ pParam->documentLength,\r
+ pParam->pDocument );\r
+ }\r
+ }\r
+\r
+ return match;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _commonOperationCallback( _shadowOperationType_t type,\r
+ IotMqttCallbackParam_t * pMessage )\r
+{\r
+ _shadowOperation_t * pOperation = NULL;\r
+ IotLink_t * pOperationLink = NULL;\r
+ AwsIotStatus_t status = AWS_IOT_UNKNOWN;\r
+ _operationMatchParams_t param = { .type = ( _shadowOperationType_t ) 0 };\r
+ uint32_t flags = 0;\r
+\r
+ /* Set operation type to search. */\r
+ param.type = type;\r
+\r
+ /* Set the response document for a Shadow UPDATE. */\r
+ if( type == SHADOW_UPDATE )\r
+ {\r
+ param.pDocument = pMessage->u.message.info.pPayload;\r
+ param.documentLength = pMessage->u.message.info.payloadLength;\r
+ }\r
+\r
+ /* Parse the Thing Name from the MQTT topic name. */\r
+ if( AwsIot_ParseThingName( pMessage->u.message.info.pTopicName,\r
+ pMessage->u.message.info.topicNameLength,\r
+ &( param.pThingName ),\r
+ &( param.thingNameLength ) ) == false )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Lock the pending operations list for exclusive access. */\r
+ IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+\r
+ /* Search for a matching pending operation. */\r
+ pOperationLink = IotListDouble_FindFirstMatch( &( _AwsIotShadowPendingOperations ),\r
+ NULL,\r
+ _shadowOperation_match,\r
+ ¶m );\r
+\r
+ /* Find and remove the first Shadow operation of the given type. */\r
+ if( pOperationLink == NULL )\r
+ {\r
+ /* Operation is not pending. It may have already been processed. Return\r
+ * without doing anything */\r
+ IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+\r
+ IotLogWarn( "Shadow %s callback received an unknown operation.",\r
+ _pAwsIotShadowOperationNames[ type ] );\r
+\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ pOperation = IotLink_Container( _shadowOperation_t, pOperationLink, link );\r
+\r
+ /* Remove a non-waitable operation from the pending operation list.\r
+ * Waitable operations are removed by the Wait function. */\r
+ if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 )\r
+ {\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+ }\r
+ }\r
+\r
+ /* Check that the Shadow operation type and status. */\r
+ AwsIotShadow_Assert( pOperation->type == type );\r
+ AwsIotShadow_Assert( pOperation->status == AWS_IOT_SHADOW_STATUS_PENDING );\r
+\r
+ IotLogDebug( "Received Shadow response on topic %.*s",\r
+ pMessage->u.message.info.topicNameLength,\r
+ pMessage->u.message.info.pTopicName );\r
+\r
+ /* Parse the status from the topic name. */\r
+ status = AwsIot_ParseStatus( pMessage->u.message.info.pTopicName,\r
+ pMessage->u.message.info.topicNameLength );\r
+\r
+ switch( status )\r
+ {\r
+ case AWS_IOT_ACCEPTED:\r
+ IotLogInfo( "Shadow %s of %.*s was ACCEPTED.",\r
+ _pAwsIotShadowOperationNames[ type ],\r
+ pOperation->pSubscription->thingNameLength,\r
+ pOperation->pSubscription->pThingName );\r
+\r
+ /* Process the retrieved document for a Shadow GET. Otherwise, set\r
+ * status to success. */\r
+ if( type == SHADOW_GET )\r
+ {\r
+ pOperation->status = _processAcceptedGet( pOperation,\r
+ &( pMessage->u.message.info ) );\r
+ }\r
+ else\r
+ {\r
+ pOperation->status = AWS_IOT_SHADOW_SUCCESS;\r
+ }\r
+\r
+ break;\r
+\r
+ case AWS_IOT_REJECTED:\r
+ IotLogWarn( "Shadow %s of %.*s was REJECTED.",\r
+ _pAwsIotShadowOperationNames[ type ],\r
+ pOperation->pSubscription->thingNameLength,\r
+ pOperation->pSubscription->pThingName );\r
+\r
+ pOperation->status = _AwsIotShadow_ParseErrorDocument( pMessage->u.message.info.pPayload,\r
+ pMessage->u.message.info.payloadLength );\r
+ break;\r
+\r
+ default:\r
+ IotLogWarn( "Unknown status for %s of %.*s Shadow. Ignoring message.",\r
+ _pAwsIotShadowOperationNames[ type ],\r
+ pOperation->pSubscription->thingNameLength,\r
+ pOperation->pSubscription->pThingName );\r
+\r
+ pOperation->status = AWS_IOT_SHADOW_BAD_RESPONSE;\r
+ break;\r
+ }\r
+\r
+ /* Copy the flags from the Shadow operation. The notify function may delete the operation. */\r
+ flags = pOperation->flags;\r
+\r
+ /* Notify of operation completion. */\r
+ _notifyCompletion( pOperation );\r
+\r
+ /* For waitable operations, unlock the pending operation list mutex to allow\r
+ * the Wait function to run. */\r
+ if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+ {\r
+ IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+ }\r
+\r
+ /* This function has no return value and no cleanup, but uses the cleanup\r
+ * label to exit on error. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _deleteCallback( void * pArgument,\r
+ IotMqttCallbackParam_t * pMessage )\r
+{\r
+ /* Silence warnings about unused parameter. */\r
+ ( void ) pArgument;\r
+\r
+ _commonOperationCallback( SHADOW_DELETE, pMessage );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _getCallback( void * pArgument,\r
+ IotMqttCallbackParam_t * pMessage )\r
+{\r
+ /* Silence warnings about unused parameter. */\r
+ ( void ) pArgument;\r
+\r
+ _commonOperationCallback( SHADOW_GET, pMessage );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation,\r
+ const IotMqttPublishInfo_t * pPublishInfo )\r
+{\r
+ AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;\r
+\r
+ /* A non-waitable operation can re-use the pointers from the publish info,\r
+ * since those are guaranteed to be in-scope throughout the user callback.\r
+ * But a waitable operation must copy the data from the publish info because\r
+ * AwsIotShadow_Wait may be called after the MQTT library frees the publish\r
+ * info. */\r
+ if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 )\r
+ {\r
+ pOperation->u.get.pDocument = pPublishInfo->pPayload;\r
+ pOperation->u.get.documentLength = pPublishInfo->payloadLength;\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "Allocating new buffer for waitable Shadow GET." );\r
+\r
+ /* Parameter validation should not have allowed a NULL malloc function. */\r
+ AwsIotShadow_Assert( pOperation->u.get.mallocDocument != NULL );\r
+\r
+ /* Allocate a buffer for the retrieved document. */\r
+ pOperation->u.get.pDocument = pOperation->u.get.mallocDocument( pPublishInfo->payloadLength );\r
+\r
+ if( pOperation->u.get.pDocument == NULL )\r
+ {\r
+ IotLogError( "Failed to allocate buffer for retrieved Shadow document." );\r
+\r
+ status = AWS_IOT_SHADOW_NO_MEMORY;\r
+ }\r
+ else\r
+ {\r
+ /* Copy the retrieved document. */\r
+ ( void ) memcpy( ( void * ) pOperation->u.get.pDocument,\r
+ pPublishInfo->pPayload,\r
+ pPublishInfo->payloadLength );\r
+ pOperation->u.get.documentLength = pPublishInfo->payloadLength;\r
+ }\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _updateCallback( void * pArgument,\r
+ IotMqttCallbackParam_t * pMessage )\r
+{\r
+ /* Silence warnings about unused parameter. */\r
+ ( void ) pArgument;\r
+\r
+ _commonOperationCallback( SHADOW_UPDATE, pMessage );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _notifyCompletion( _shadowOperation_t * pOperation )\r
+{\r
+ AwsIotShadowCallbackParam_t callbackParam = { .callbackType = ( AwsIotShadowCallbackType_t ) 0 };\r
+ _shadowSubscription_t * pSubscription = pOperation->pSubscription,\r
+ * pRemovedSubscription = NULL;\r
+\r
+ /* If the operation is waiting, post to its wait semaphore and return. */\r
+ if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+ {\r
+ IotSemaphore_Post( &( pOperation->notify.waitSemaphore ) );\r
+ }\r
+ else\r
+ {\r
+ /* Decrement the reference count. This also removes subscriptions if the\r
+ * count reaches 0. */\r
+ IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );\r
+ _AwsIotShadow_DecrementReferences( pOperation,\r
+ pSubscription->pTopicBuffer,\r
+ &pRemovedSubscription );\r
+ IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );\r
+\r
+ /* Set the subscription pointer used for the user callback based on whether\r
+ * a subscription was removed from the list. */\r
+ if( pRemovedSubscription != NULL )\r
+ {\r
+ pSubscription = pRemovedSubscription;\r
+ }\r
+\r
+ AwsIotShadow_Assert( pSubscription != NULL );\r
+\r
+ /* Invoke the user callback if provided. */\r
+ if( pOperation->notify.callback.function != NULL )\r
+ {\r
+ /* Set the common members of the callback parameter. */\r
+ callbackParam.callbackType = ( AwsIotShadowCallbackType_t ) pOperation->type;\r
+ callbackParam.mqttConnection = pOperation->mqttConnection;\r
+ callbackParam.u.operation.result = pOperation->status;\r
+ callbackParam.u.operation.reference = pOperation;\r
+ callbackParam.pThingName = pSubscription->pThingName;\r
+ callbackParam.thingNameLength = pSubscription->thingNameLength;\r
+\r
+ /* Set the members of the callback parameter for a received document. */\r
+ if( pOperation->type == SHADOW_GET )\r
+ {\r
+ callbackParam.u.operation.get.pDocument = pOperation->u.get.pDocument;\r
+ callbackParam.u.operation.get.documentLength = pOperation->u.get.documentLength;\r
+ }\r
+\r
+ pOperation->notify.callback.function( pOperation->notify.callback.pCallbackContext,\r
+ &callbackParam );\r
+ }\r
+\r
+ /* Destroy a removed subscription. */\r
+ if( pRemovedSubscription != NULL )\r
+ {\r
+ _AwsIotShadow_DestroySubscription( pRemovedSubscription );\r
+ }\r
+\r
+ _AwsIotShadow_DestroyOperation( pOperation );\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static AwsIotShadowError_t _findSubscription( const char * pThingName,\r
+ size_t thingNameLength,\r
+ char * pTopicBuffer,\r
+ uint16_t operationTopicLength,\r
+ _shadowOperation_t * pOperation,\r
+ bool * pFreeTopicBuffer )\r
+{\r
+ AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;\r
+ _shadowSubscription_t * pSubscription = NULL;\r
+\r
+ /* Lookup table for Shadow operation callbacks. */\r
+ const AwsIotMqttCallbackFunction_t shadowCallbacks[ SHADOW_OPERATION_COUNT ] =\r
+ {\r
+ _deleteCallback,\r
+ _getCallback,\r
+ _updateCallback\r
+ };\r
+\r
+ /* Lock the subscriptions mutex for exclusive access. */\r
+ IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );\r
+\r
+ /* Check for an existing subscription. This function will attempt to allocate\r
+ * a new subscription if not found. */\r
+ pSubscription = _AwsIotShadow_FindSubscription( pThingName,\r
+ thingNameLength,\r
+ true );\r
+\r
+ if( pSubscription == NULL )\r
+ {\r
+ status = AWS_IOT_SHADOW_NO_MEMORY;\r
+ }\r
+ else\r
+ {\r
+ /* Ensure that the subscription Thing Name matches. */\r
+ AwsIotShadow_Assert( pSubscription != NULL );\r
+ AwsIotShadow_Assert( pSubscription->thingNameLength == thingNameLength );\r
+ AwsIotShadow_Assert( strncmp( pSubscription->pThingName,\r
+ pThingName,\r
+ thingNameLength ) == 0 );\r
+\r
+ /* Set the subscription object for the Shadow operation. */\r
+ pOperation->pSubscription = pSubscription;\r
+\r
+ /* Assign the topic buffer to the subscription to use for unsubscribing if\r
+ * the subscription has no topic buffer. */\r
+ if( pSubscription->pTopicBuffer == NULL )\r
+ {\r
+ pSubscription->pTopicBuffer = pTopicBuffer;\r
+\r
+ /* Don't free the topic buffer if it was allocated to the subscription. */\r
+ *pFreeTopicBuffer = false;\r
+ }\r
+ else\r
+ {\r
+ *pFreeTopicBuffer = true;\r
+ }\r
+\r
+ /* Increment the reference count for this Shadow operation's\r
+ * subscriptions. */\r
+ status = _AwsIotShadow_IncrementReferences( pOperation,\r
+ pTopicBuffer,\r
+ operationTopicLength,\r
+ shadowCallbacks[ pOperation->type ] );\r
+\r
+ if( status != AWS_IOT_SHADOW_SUCCESS )\r
+ {\r
+ /* Failed to add subscriptions for a Shadow operation. The reference\r
+ * count was not incremented. Check if this subscription should be\r
+ * deleted. */\r
+ _AwsIotShadow_RemoveSubscription( pSubscription, NULL );\r
+ }\r
+ }\r
+\r
+ /* Unlock the Shadow subscription list mutex. */\r
+ IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+AwsIotShadowError_t _AwsIotShadow_CreateOperation( _shadowOperation_t ** pNewOperation,\r
+ _shadowOperationType_t type,\r
+ uint32_t flags,\r
+ const AwsIotShadowCallbackInfo_t * pCallbackInfo )\r
+{\r
+ IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_SUCCESS );\r
+ _shadowOperation_t * pOperation = NULL;\r
+\r
+ IotLogDebug( "Creating operation record for Shadow %s.",\r
+ _pAwsIotShadowOperationNames[ type ] );\r
+\r
+ /* Allocate memory for a new Shadow operation. */\r
+ pOperation = AwsIotShadow_MallocOperation( sizeof( _shadowOperation_t ) );\r
+\r
+ if( pOperation == NULL )\r
+ {\r
+ IotLogError( "Failed to allocate memory for Shadow %s.",\r
+ _pAwsIotShadowOperationNames[ type ] );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );\r
+ }\r
+\r
+ /* Clear the operation data. */\r
+ ( void ) memset( pOperation, 0x00, sizeof( _shadowOperation_t ) );\r
+\r
+ /* Check if the waitable flag is set. If it is, create a semaphore to\r
+ * wait on. */\r
+ if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+ {\r
+ if( IotSemaphore_Create( &( pOperation->notify.waitSemaphore ), 0, 1 ) == false )\r
+ {\r
+ IotLogError( "Failed to create semaphore for waitable Shadow %s.",\r
+ _pAwsIotShadowOperationNames[ type ] );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* If the waitable flag isn't set but a callback is, copy the callback\r
+ * information. */\r
+ if( pCallbackInfo != NULL )\r
+ {\r
+ pOperation->notify.callback = *pCallbackInfo;\r
+ }\r
+ }\r
+\r
+ /* Set the remaining common members of the Shadow operation. */\r
+ pOperation->type = type;\r
+ pOperation->flags = flags;\r
+ pOperation->status = AWS_IOT_SHADOW_STATUS_PENDING;\r
+\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ if( status != AWS_IOT_SHADOW_SUCCESS )\r
+ {\r
+ if( pOperation != NULL )\r
+ {\r
+ AwsIotShadow_FreeOperation( pOperation );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* Set the output parameter. */\r
+ *pNewOperation = pOperation;\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _AwsIotShadow_DestroyOperation( void * pData )\r
+{\r
+ _shadowOperation_t * pOperation = ( _shadowOperation_t * ) pData;\r
+\r
+ /* The Shadow operation pointer must not be NULL. */\r
+ AwsIotShadow_Assert( pOperation != NULL );\r
+\r
+ IotLogDebug( "Destroying Shadow operation %s.",\r
+ _pAwsIotShadowOperationNames[ pOperation->type ] );\r
+\r
+ /* Check if a wait semaphore was created for this operation. */\r
+ if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )\r
+ {\r
+ /* Destroy the wait semaphore */\r
+ IotSemaphore_Destroy( &( pOperation->notify.waitSemaphore ) );\r
+ }\r
+\r
+ /* If this is a Shadow update, free any allocated client token. */\r
+ if( ( pOperation->type == SHADOW_UPDATE ) &&\r
+ ( pOperation->u.update.pClientToken != NULL ) )\r
+ {\r
+ AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 );\r
+\r
+ AwsIotShadow_FreeString( ( void * ) ( pOperation->u.update.pClientToken ) );\r
+ }\r
+\r
+ /* Free the memory used to hold operation data. */\r
+ AwsIotShadow_FreeOperation( pOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+AwsIotShadowError_t _AwsIotShadow_GenerateShadowTopic( _shadowOperationType_t type,\r
+ const char * pThingName,\r
+ size_t thingNameLength,\r
+ char ** pTopicBuffer,\r
+ uint16_t * pOperationTopicLength )\r
+{\r
+ AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;\r
+ AwsIotTopicInfo_t topicInfo = { 0 };\r
+\r
+ /* Lookup table for Shadow operation strings. */\r
+ const char * const pOperationString[ SHADOW_OPERATION_COUNT ] =\r
+ {\r
+ SHADOW_DELETE_OPERATION_STRING, /* Shadow delete operation. */\r
+ SHADOW_GET_OPERATION_STRING, /* Shadow get operation. */\r
+ SHADOW_UPDATE_OPERATION_STRING /* Shadow update operation. */\r
+ };\r
+\r
+ /* Lookup table for Shadow operation string lengths. */\r
+ const uint16_t pOperationStringLength[ SHADOW_OPERATION_COUNT ] =\r
+ {\r
+ SHADOW_DELETE_OPERATION_STRING_LENGTH, /* Shadow delete operation. */\r
+ SHADOW_GET_OPERATION_STRING_LENGTH, /* Shadow get operation. */\r
+ SHADOW_UPDATE_OPERATION_STRING_LENGTH /* Shadow update operation. */\r
+ };\r
+\r
+ /* Only Shadow delete, get, and update operation types should be passed to this\r
+ * function. */\r
+ AwsIotShadow_Assert( ( type == SHADOW_DELETE ) ||\r
+ ( type == SHADOW_GET ) ||\r
+ ( type == SHADOW_UPDATE ) );\r
+\r
+ /* Set the members needed to generate an operation topic. */\r
+ topicInfo.pThingName = pThingName;\r
+ topicInfo.thingNameLength = thingNameLength;\r
+ topicInfo.pOperationName = pOperationString[ type ];\r
+ topicInfo.operationNameLength = pOperationStringLength[ type ];\r
+ topicInfo.longestSuffixLength = SHADOW_LONGEST_SUFFIX_LENGTH;\r
+ topicInfo.mallocString = AwsIotShadow_MallocString;\r
+\r
+ if( AwsIot_GenerateOperationTopic( &topicInfo,\r
+ pTopicBuffer,\r
+ pOperationTopicLength ) == false )\r
+ {\r
+ status = AWS_IOT_SHADOW_NO_MEMORY;\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+AwsIotShadowError_t _AwsIotShadow_ProcessOperation( IotMqttConnection_t mqttConnection,\r
+ const char * pThingName,\r
+ size_t thingNameLength,\r
+ _shadowOperation_t * pOperation,\r
+ const AwsIotShadowDocumentInfo_t * pDocumentInfo )\r
+{\r
+ IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_STATUS_PENDING );\r
+ IotMqttError_t publishStatus = IOT_MQTT_STATUS_PENDING;\r
+ char * pTopicBuffer = NULL;\r
+ uint16_t operationTopicLength = 0;\r
+ bool freeTopicBuffer = true;\r
+ IotMqttPublishInfo_t publishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;\r
+\r
+ IotLogDebug( "Processing Shadow operation %s for Thing %.*s.",\r
+ _pAwsIotShadowOperationNames[ pOperation->type ],\r
+ thingNameLength,\r
+ pThingName );\r
+\r
+ /* Set the operation's MQTT connection. */\r
+ pOperation->mqttConnection = mqttConnection;\r
+\r
+ /* Generate the operation topic buffer. */\r
+ status = _AwsIotShadow_GenerateShadowTopic( pOperation->type,\r
+ pThingName,\r
+ thingNameLength,\r
+ &pTopicBuffer,\r
+ &operationTopicLength );\r
+\r
+ if( status != AWS_IOT_SHADOW_SUCCESS )\r
+ {\r
+ IotLogError( "No memory for Shadow operation topic buffer." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );\r
+ }\r
+\r
+ /* Get a subscription object for this Shadow operation. */\r
+ status = _findSubscription( pThingName,\r
+ thingNameLength,\r
+ pTopicBuffer,\r
+ operationTopicLength,\r
+ pOperation,\r
+ &freeTopicBuffer );\r
+\r
+ if( status != AWS_IOT_SHADOW_SUCCESS )\r
+ {\r
+ /* No subscription was found and no subscription could be allocated. */\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Set the operation topic name. */\r
+ publishInfo.pTopicName = pTopicBuffer;\r
+ publishInfo.topicNameLength = operationTopicLength;\r
+\r
+ IotLogDebug( "Shadow %s message will be published to topic %.*s",\r
+ _pAwsIotShadowOperationNames[ pOperation->type ],\r
+ publishInfo.topicNameLength,\r
+ publishInfo.pTopicName );\r
+\r
+ /* Set the document info if this operation is not a Shadow DELETE. */\r
+ if( pOperation->type != SHADOW_DELETE )\r
+ {\r
+ publishInfo.qos = pDocumentInfo->qos;\r
+ publishInfo.retryLimit = pDocumentInfo->retryLimit;\r
+ publishInfo.retryMs = pDocumentInfo->retryMs;\r
+\r
+ IotLogDebug( "Shadow %s message will be published at QoS %d with "\r
+ "retryLimit %d and retryMs %llu.",\r
+ _pAwsIotShadowOperationNames[ pOperation->type ],\r
+ publishInfo.qos,\r
+ publishInfo.retryLimit,\r
+ publishInfo.retryMs );\r
+ }\r
+\r
+ /* Set the PUBLISH payload to the update document for Shadow UPDATE. */\r
+ if( pOperation->type == SHADOW_UPDATE )\r
+ {\r
+ publishInfo.pPayload = pDocumentInfo->u.update.pUpdateDocument;\r
+ publishInfo.payloadLength = pDocumentInfo->u.update.updateDocumentLength;\r
+ }\r
+\r
+ /* Set the PUBLISH payload to an empty string for Shadow DELETE and GET,\r
+ * per the Shadow spec. */\r
+ else\r
+ {\r
+ publishInfo.pPayload = "";\r
+ publishInfo.payloadLength = 0;\r
+ }\r
+\r
+ /* Add Shadow operation to the pending operations list. */\r
+ IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+ IotListDouble_InsertHead( &( _AwsIotShadowPendingOperations ),\r
+ &( pOperation->link ) );\r
+ IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+\r
+ /* Publish to the Shadow topic name. */\r
+ publishStatus = IotMqtt_PublishSync( pOperation->mqttConnection,\r
+ &publishInfo,\r
+ 0,\r
+ _AwsIotShadowMqttTimeoutMs );\r
+\r
+ /* Check for errors from the MQTT publish. */\r
+ if( publishStatus != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to publish MQTT message to %s %.*s Shadow, error %s.",\r
+ _pAwsIotShadowOperationNames[ pOperation->type ],\r
+ thingNameLength,\r
+ pThingName,\r
+ IotMqtt_strerror( publishStatus ) );\r
+\r
+ /* Convert the MQTT "NO MEMORY" error to a Shadow "NO MEMORY" error. */\r
+ status = SHADOW_CONVERT_STATUS_CODE_MQTT_TO_SHADOW( publishStatus );\r
+\r
+ /* If the "keep subscriptions" flag is not set, decrement the reference\r
+ * count. */\r
+ if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_KEEP_SUBSCRIPTIONS ) == 0 )\r
+ {\r
+ IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );\r
+ _AwsIotShadow_DecrementReferences( pOperation,\r
+ pTopicBuffer,\r
+ NULL );\r
+ IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );\r
+ }\r
+\r
+ /* Remove Shadow operation from the pending operations list. */\r
+ IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "Shadow %s PUBLISH message successfully sent.",\r
+ _pAwsIotShadowOperationNames[ pOperation->type ] );\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ /* Free the topic buffer used by this function if it was not assigned to a\r
+ * subscription. */\r
+ if( ( freeTopicBuffer == true ) && ( pTopicBuffer != NULL ) )\r
+ {\r
+ AwsIotShadow_FreeString( pTopicBuffer );\r
+ }\r
+\r
+ /* Destroy the Shadow operation on failure. */\r
+ if( status != AWS_IOT_SHADOW_SUCCESS )\r
+ {\r
+ _AwsIotShadow_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ /* Convert successful return code to "status pending", as the Shadow\r
+ * library is now waiting for a response from the service. */\r
+ status = AWS_IOT_SHADOW_STATUS_PENDING;\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r