--- /dev/null
+/*\r
+ * Amazon FreeRTOS MQTT V2.0.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://aws.amazon.com/freertos\r
+ * http://www.FreeRTOS.org\r
+ */\r
+\r
+/**\r
+ * @file iot_mqtt_operation.c\r
+ * @brief Implements functions that process MQTT operations.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <string.h>\r
+\r
+/* Error handling include. */\r
+#include "private/iot_error.h"\r
+\r
+/* MQTT internal include. */\r
+#include "private/iot_mqtt_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_clock.h"\r
+#include "platform/iot_threads.h"\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief First parameter to #_mqttOperation_match.\r
+ */\r
+typedef struct _operationMatchParam\r
+{\r
+ IotMqttOperationType_t type; /**< @brief The type of operation to look for. */\r
+ const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.\r
+ * Set to `NULL` to ignore packet identifier. */\r
+} _operationMatchParam_t;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Match an MQTT operation by type and packet identifier.\r
+ *\r
+ * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.\r
+ * @param[in] pMatch Pointer to an #_operationMatchParam_t.\r
+ *\r
+ * @return `true` if the operation matches the parameters in `pArgument`; `false`\r
+ * otherwise.\r
+ */\r
+static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
+ void * pMatch );\r
+\r
+/**\r
+ * @brief Check if an operation with retry has exceeded its retry limit.\r
+ *\r
+ * If a PUBLISH operation is available for retry, this function also sets any\r
+ * necessary DUP flags.\r
+ *\r
+ * @param[in] pOperation The operation to check.\r
+ *\r
+ * @return `true` if the operation may be retried; `false` otherwise.\r
+ */\r
+static bool _checkRetryLimit( _mqttOperation_t * pOperation );\r
+\r
+/**\r
+ * @brief Schedule the next send of an operation with retry.\r
+ *\r
+ * @param[in] pOperation The operation to schedule.\r
+ *\r
+ * @return `true` if the reschedule succeeded; `false` otherwise.\r
+ */\r
+static bool _scheduleNextRetry( _mqttOperation_t * pOperation );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
+ void * pMatch )\r
+{\r
+ bool match = false;\r
+\r
+ /* Because this function is called from a container function, the given link\r
+ * must never be NULL. */\r
+ IotMqtt_Assert( pOperationLink != NULL );\r
+\r
+ _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,\r
+ pOperationLink,\r
+ link );\r
+ _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;\r
+\r
+ /* Check for matching operations. */\r
+ if( pParam->type == pOperation->u.operation.type )\r
+ {\r
+ /* Check for matching packet identifiers. */\r
+ if( pParam->pPacketIdentifier == NULL )\r
+ {\r
+ match = true;\r
+ }\r
+ else\r
+ {\r
+ match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return match;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _checkRetryLimit( _mqttOperation_t * pOperation )\r
+{\r
+ _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+ bool status = true;\r
+\r
+ /* Choose a set DUP function. */\r
+ void ( * publishSetDup )( uint8_t *,\r
+ uint8_t *,\r
+ uint16_t * ) = _IotMqtt_PublishSetDup;\r
+\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )\r
+ {\r
+ publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Only PUBLISH may be retried. */\r
+ IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );\r
+\r
+ /* Check if the retry limit is exhausted. */\r
+ if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+ {\r
+ /* The retry count may be at most one more than the retry limit, which\r
+ * accounts for the final check for a PUBACK. */\r
+ IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );\r
+\r
+ IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",\r
+ pMqttConnection,\r
+ pOperation,\r
+ pOperation->u.operation.retry.limit );\r
+\r
+ status = false;\r
+ }\r
+ /* Check if this is the first retry. */\r
+ else if( pOperation->u.operation.retry.count == 1 )\r
+ {\r
+ /* Always set the DUP flag on the first retry. */\r
+ publishSetDup( pOperation->u.operation.pMqttPacket,\r
+ pOperation->u.operation.pPacketIdentifierHigh,\r
+ &( pOperation->u.operation.packetIdentifier ) );\r
+ }\r
+ else\r
+ {\r
+ /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
+ * identifier) must be reset on every retry. */\r
+ if( pMqttConnection->awsIotMqttMode == true )\r
+ {\r
+ publishSetDup( pOperation->u.operation.pMqttPacket,\r
+ pOperation->u.operation.pPacketIdentifierHigh,\r
+ &( pOperation->u.operation.packetIdentifier ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _scheduleNextRetry( _mqttOperation_t * pOperation )\r
+{\r
+ bool firstRetry = false;\r
+ uint32_t scheduleDelay = 0;\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+ /* This function should never be called with retry count greater than\r
+ * retry limit. */\r
+ IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );\r
+\r
+ /* Increment the retry count. */\r
+ ( pOperation->u.operation.retry.count )++;\r
+\r
+ /* Check for a response shortly for the final retry. Otherwise, calculate the\r
+ * next retry period. */\r
+ if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
+ {\r
+ scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;\r
+\r
+ IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "\r
+ "for response in %d ms.",\r
+ pMqttConnection,\r
+ pOperation,\r
+ IOT_MQTT_RESPONSE_WAIT_MS );\r
+ }\r
+ else\r
+ {\r
+ scheduleDelay = pOperation->u.operation.retry.nextPeriod;\r
+\r
+ /* Double the retry period, subject to a ceiling value. */\r
+ pOperation->u.operation.retry.nextPeriod *= 2;\r
+\r
+ if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )\r
+ {\r
+ pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",\r
+ pMqttConnection,\r
+ pOperation,\r
+ ( unsigned long ) pOperation->u.operation.retry.count,\r
+ ( unsigned long ) pOperation->u.operation.retry.limit,\r
+ ( unsigned long ) scheduleDelay );\r
+\r
+ /* Check if this is the first retry. */\r
+ firstRetry = ( pOperation->u.operation.retry.count == 1 );\r
+\r
+ /* On the first retry, the PUBLISH will be moved from the pending processing\r
+ * list to the pending responses list. Lock the connection references mutex\r
+ * to manipulate the lists. */\r
+ if( firstRetry == true )\r
+ {\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+\r
+ /* Reschedule the PUBLISH for another send. */\r
+ status = _IotMqtt_ScheduleOperation( pOperation,\r
+ _IotMqtt_ProcessSend,\r
+ scheduleDelay );\r
+\r
+ /* Check for successful reschedule. */\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ /* Move a successfully rescheduled PUBLISH from the pending processing\r
+ * list to the pending responses list on the first retry. */\r
+ if( firstRetry == true )\r
+ {\r
+ /* Operation must be linked. */\r
+ IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );\r
+\r
+ /* Transfer to pending response list. */\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
+ &( pOperation->link ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* The references mutex only needs to be unlocked on the first retry, since\r
+ * only the first retry manipulates the connection lists. */\r
+ if( firstRetry == true )\r
+ {\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return( status == IOT_MQTT_SUCCESS );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,\r
+ uint32_t flags,\r
+ const IotMqttCallbackInfo_t * pCallbackInfo,\r
+ _mqttOperation_t ** pNewOperation )\r
+{\r
+ IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+ bool decrementOnError = false;\r
+ _mqttOperation_t * pOperation = NULL;\r
+ bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );\r
+\r
+ /* If the waitable flag is set, make sure that there's no callback. */\r
+ if( waitable == true )\r
+ {\r
+ if( pCallbackInfo != NULL )\r
+ {\r
+ IotLogError( "Callback should not be set for a waitable operation." );\r
+\r
+ return IOT_MQTT_BAD_PARAMETER;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotLogDebug( "(MQTT connection %p) Creating new operation record.",\r
+ pMqttConnection );\r
+\r
+ /* Increment the reference count for the MQTT connection when creating a new\r
+ * operation. */\r
+ if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )\r
+ {\r
+ IotLogError( "(MQTT connection %p) New operation record cannot be created"\r
+ " for a closed connection",\r
+ pMqttConnection );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
+ }\r
+ else\r
+ {\r
+ /* Reference count will need to be decremented on error. */\r
+ decrementOnError = true;\r
+ }\r
+\r
+ /* Allocate memory for a new operation. */\r
+ pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );\r
+\r
+ if( pOperation == NULL )\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to allocate memory for new "\r
+ "operation record.",\r
+ pMqttConnection );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
+ }\r
+ else\r
+ {\r
+ /* Clear the operation data. */\r
+ ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );\r
+\r
+ /* Initialize some members of the new operation. */\r
+ pOperation->pMqttConnection = pMqttConnection;\r
+ pOperation->u.operation.jobReference = 1;\r
+ pOperation->u.operation.flags = flags;\r
+ pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;\r
+ }\r
+\r
+ /* Check if the waitable flag is set. If it is, create a semaphore to\r
+ * wait on. */\r
+ if( waitable == true )\r
+ {\r
+ /* Create a semaphore to wait on for a waitable operation. */\r
+ if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to create semaphore for "\r
+ "waitable operation.",\r
+ pMqttConnection );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
+ }\r
+ else\r
+ {\r
+ /* A waitable operation is created with an additional reference for the\r
+ * Wait function. */\r
+ ( pOperation->u.operation.jobReference )++;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* If the waitable flag isn't set but a callback is, copy the callback\r
+ * information. */\r
+ if( pCallbackInfo != NULL )\r
+ {\r
+ pOperation->u.operation.notify.callback = *pCallbackInfo;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+\r
+ /* Add this operation to the MQTT connection's operation list. */\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+ IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
+ &( pOperation->link ) );\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Set the output parameter. */\r
+ *pNewOperation = pOperation;\r
+\r
+ /* Clean up operation and decrement reference count if this function failed. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ if( decrementOnError == true )\r
+ {\r
+ _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( pOperation != NULL )\r
+ {\r
+ IotMqtt_FreeOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,\r
+ bool cancelJob )\r
+{\r
+ bool destroyOperation = false;\r
+ IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+ _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+ /* Attempt to cancel the operation's job. */\r
+ if( cancelJob == true )\r
+ {\r
+ taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
+ pOperation->job,\r
+ NULL );\r
+\r
+ /* If the operation's job was not canceled, it must be already executing.\r
+ * Any other return value is invalid. */\r
+ IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
+ ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
+\r
+ if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Decrement job reference count. */\r
+ if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+ pOperation->u.operation.jobReference--;\r
+\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"\r
+ " from %ld to %ld.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation,\r
+ pOperation->u.operation.jobReference + 1,\r
+ pOperation->u.operation.jobReference );\r
+\r
+ /* The job reference count must be 0 or 1 after the decrement. */\r
+ IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||\r
+ ( pOperation->u.operation.jobReference == 1 ) );\r
+\r
+ /* This operation may be destroyed if its reference count is 0. */\r
+ if( pOperation->u.operation.jobReference == 0 )\r
+ {\r
+ destroyOperation = true;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return destroyOperation;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )\r
+{\r
+ _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+ /* Default free packet function. */\r
+ void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
+\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+\r
+ /* The job reference count must be between 0 and 2. */\r
+ IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&\r
+ ( pOperation->u.operation.jobReference <= 2 ) );\r
+\r
+ /* Jobs to be destroyed should be removed from the MQTT connection's\r
+ * lists. */\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation,\r
+ pMqttConnection );\r
+\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Free any allocated MQTT packet. */\r
+ if( pOperation->u.operation.pMqttPacket != NULL )\r
+ {\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pMqttConnection->pSerializer->freePacket != NULL )\r
+ {\r
+ freePacket = pMqttConnection->pSerializer->freePacket;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ freePacket( pOperation->u.operation.pMqttPacket );\r
+\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+ }\r
+\r
+ /* Check if a wait semaphore was created for this operation. */\r
+ if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
+ {\r
+ IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );\r
+\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+\r
+ /* Free the memory used to hold operation data. */\r
+ IotMqtt_FreeOperation( pOperation );\r
+\r
+ /* Decrement the MQTT connection's reference count after destroying an\r
+ * operation. */\r
+ _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,\r
+ IotTaskPoolJob_t pKeepAliveJob,\r
+ void * pContext )\r
+{\r
+ bool status = true;\r
+ IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+ size_t bytesSent = 0;\r
+\r
+ /* Retrieve the MQTT connection from the context. */\r
+ _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;\r
+\r
+ /* Check parameters. */\r
+ IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+ IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );\r
+\r
+ /* Check that keep-alive interval is valid. The MQTT spec states its maximum\r
+ * value is 65,535 seconds. */\r
+ IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );\r
+\r
+ /* Only two values are valid for the next keep alive job delay. */\r
+ IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||\r
+ ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
+\r
+ IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );\r
+\r
+ /* Re-create the keep-alive job for rescheduling. This should never fail. */\r
+ taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
+ pContext,\r
+ IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),\r
+ &pKeepAliveJob );\r
+ IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
+\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Determine whether to send a PINGREQ or check for PINGRESP. */\r
+ if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );\r
+\r
+ /* Because PINGREQ may be used to keep the MQTT connection alive, it is\r
+ * more important than other operations. Bypass the queue of jobs for\r
+ * operations by directly sending the PINGREQ in this job. */\r
+ bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
+ pMqttConnection->pPingreqPacket,\r
+ pMqttConnection->pingreqPacketSize );\r
+\r
+ if( bytesSent != pMqttConnection->pingreqPacketSize )\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );\r
+ status = false;\r
+ }\r
+ else\r
+ {\r
+ /* Assume the keep-alive will fail. The network receive callback will\r
+ * clear the failure flag upon receiving a PINGRESP. */\r
+ pMqttConnection->keepAliveFailure = true;\r
+\r
+ /* Schedule a check for PINGRESP. */\r
+ pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
+\r
+ IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",\r
+ pMqttConnection,\r
+ IOT_MQTT_RESPONSE_WAIT_MS );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );\r
+\r
+ if( pMqttConnection->keepAliveFailure == false )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );\r
+\r
+ /* PINGRESP was received. Schedule the next PINGREQ transmission. */\r
+ pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+ }\r
+ else\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",\r
+ pMqttConnection,\r
+ IOT_MQTT_RESPONSE_WAIT_MS );\r
+\r
+ /* The network receive callback did not clear the failure flag. */\r
+ status = false;\r
+ }\r
+ }\r
+\r
+ /* When a PINGREQ is successfully sent, reschedule this job to check for a\r
+ * response shortly. */\r
+ if( status == true )\r
+ {\r
+ taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,\r
+ pKeepAliveJob,\r
+ pMqttConnection->nextKeepAliveMs );\r
+\r
+ if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",\r
+ pMqttConnection,\r
+ IOT_MQTT_RESPONSE_WAIT_MS );\r
+ }\r
+ else\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",\r
+ pMqttConnection,\r
+ IotTaskPool_strerror( taskPoolStatus ) );\r
+\r
+ status = false;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Close the connection on failures. */\r
+ if( status == false )\r
+ {\r
+ _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,\r
+ pMqttConnection );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,\r
+ IotTaskPoolJob_t pPublishJob,\r
+ void * pContext )\r
+{\r
+ _mqttOperation_t * pOperation = pContext;\r
+ IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };\r
+\r
+ /* Check parameters. The task pool and job parameter is not used when asserts\r
+ * are disabled. */\r
+ ( void ) pTaskPool;\r
+ ( void ) pPublishJob;\r
+ IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+ IotMqtt_Assert( pOperation->incomingPublish == true );\r
+ IotMqtt_Assert( pPublishJob == pOperation->job );\r
+\r
+ /* Remove the operation from the pending processing list. */\r
+ IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );\r
+\r
+ if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
+ {\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );\r
+\r
+ /* Process the current PUBLISH. */\r
+ callbackParam.u.message.info = pOperation->u.publish.publishInfo;\r
+\r
+ _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,\r
+ &callbackParam );\r
+\r
+ /* Free any buffers associated with the current PUBLISH message. */\r
+ if( pOperation->u.publish.pReceivedData != NULL )\r
+ {\r
+ IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Free the incoming PUBLISH operation. */\r
+ IotMqtt_FreeOperation( pOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,\r
+ IotTaskPoolJob_t pSendJob,\r
+ void * pContext )\r
+{\r
+ size_t bytesSent = 0;\r
+ bool destroyOperation = false, waitable = false, networkPending = false;\r
+ _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
+ _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+ /* Check parameters. The task pool and job parameter is not used when asserts\r
+ * are disabled. */\r
+ ( void ) pTaskPool;\r
+ ( void ) pSendJob;\r
+ IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+ IotMqtt_Assert( pSendJob == pOperation->job );\r
+\r
+ /* The given operation must have an allocated packet and be waiting for a status. */\r
+ IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );\r
+ IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+\r
+ /* Check if this operation is waitable. */\r
+ waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
+\r
+ /* Check PUBLISH retry counts and limits. */\r
+ if( pOperation->u.operation.retry.limit > 0 )\r
+ {\r
+ if( _checkRetryLimit( pOperation ) == false )\r
+ {\r
+ pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Send an operation that is waiting for a response. */\r
+ if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+\r
+ /* Transmit the MQTT packet from the operation over the network. */\r
+ bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
+ pOperation->u.operation.pMqttPacket,\r
+ pOperation->u.operation.packetSize );\r
+\r
+ /* Check transmission status. */\r
+ if( bytesSent != pOperation->u.operation.packetSize )\r
+ {\r
+ pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;\r
+ }\r
+ else\r
+ {\r
+ /* DISCONNECT operations are considered successful upon successful\r
+ * transmission. In addition, non-waitable operations with no callback\r
+ * may also be considered successful. */\r
+ if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )\r
+ {\r
+ /* DISCONNECT operations are always waitable. */\r
+ IotMqtt_Assert( waitable == true );\r
+\r
+ pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
+ }\r
+ else if( waitable == false )\r
+ {\r
+ if( pOperation->u.operation.notify.callback.function == NULL )\r
+ {\r
+ pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check if this operation requires further processing. */\r
+ if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
+ {\r
+ /* Check if this operation should be scheduled for retransmission. */\r
+ if( pOperation->u.operation.retry.limit > 0 )\r
+ {\r
+ if( _scheduleNextRetry( pOperation ) == false )\r
+ {\r
+ pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;\r
+ }\r
+ else\r
+ {\r
+ /* A successfully scheduled PUBLISH retry is awaiting a response\r
+ * from the network. */\r
+ networkPending = true;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* Decrement reference count to signal completion of send job. Check\r
+ * if the operation should be destroyed. */\r
+ if( waitable == true )\r
+ {\r
+ destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* If the operation should not be destroyed, transfer it from the\r
+ * pending processing to the pending response list. */\r
+ if( destroyOperation == false )\r
+ {\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Operation must be linked. */\r
+ IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );\r
+\r
+ /* Transfer to pending response list. */\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
+ &( pOperation->link ) );\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* This operation is now awaiting a response from the network. */\r
+ networkPending = true;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Destroy the operation or notify of completion if necessary. */\r
+ if( destroyOperation == true )\r
+ {\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ /* Do not check the operation status if a network response is pending,\r
+ * since a network response could modify the status. */\r
+ if( networkPending == false )\r
+ {\r
+ /* Notify of operation completion if this job set a status. */\r
+ if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )\r
+ {\r
+ _IotMqtt_Notify( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,\r
+ IotTaskPoolJob_t pOperationJob,\r
+ void * pContext )\r
+{\r
+ _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
+ IotMqttCallbackParam_t callbackParam = { 0 };\r
+\r
+ /* Check parameters. The task pool and job parameter is not used when asserts\r
+ * are disabled. */\r
+ ( void ) pTaskPool;\r
+ ( void ) pOperationJob;\r
+ IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );\r
+ IotMqtt_Assert( pOperationJob == pOperation->job );\r
+\r
+ /* The operation's callback function and status must be set. */\r
+ IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );\r
+ IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );\r
+\r
+ callbackParam.mqttConnection = pOperation->pMqttConnection;\r
+ callbackParam.u.operation.type = pOperation->u.operation.type;\r
+ callbackParam.u.operation.reference = pOperation;\r
+ callbackParam.u.operation.result = pOperation->u.operation.status;\r
+\r
+ /* Invoke the user callback function. */\r
+ pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,\r
+ &callbackParam );\r
+\r
+ /* Attempt to destroy the operation once the user callback returns. */\r
+ if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
+ {\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,\r
+ IotTaskPoolRoutine_t jobRoutine,\r
+ uint32_t delay )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+ IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+\r
+ /* Check that job routine is valid. */\r
+ IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||\r
+ ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||\r
+ ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );\r
+\r
+ /* Creating a new job should never fail when parameters are valid. */\r
+ taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,\r
+ pOperation,\r
+ &( pOperation->jobStorage ),\r
+ &( pOperation->job ) );\r
+ IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
+\r
+ /* Schedule the new job with a delay. */\r
+ taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
+ pOperation->job,\r
+ delay );\r
+\r
+ if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ /* Scheduling a newly-created job should never be invalid or illegal. */\r
+ IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );\r
+ IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );\r
+\r
+ IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",\r
+ pOperation->pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation,\r
+ IotTaskPool_strerror( taskPoolStatus ) );\r
+\r
+ status = IOT_MQTT_SCHEDULING_ERROR;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+_mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,\r
+ IotMqttOperationType_t type,\r
+ const uint16_t * pPacketIdentifier )\r
+{\r
+ bool waitable = false;\r
+ IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+ _mqttOperation_t * pResult = NULL;\r
+ IotLink_t * pResultLink = NULL;\r
+ _operationMatchParam_t param = { .type = type, .pPacketIdentifier = pPacketIdentifier };\r
+\r
+ if( pPacketIdentifier != NULL )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "\r
+ "with packet identifier %hu.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( type ),\r
+ *pPacketIdentifier );\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( type ) );\r
+ }\r
+\r
+ /* Find and remove the first matching element in the list. */\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+ pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),\r
+ NULL,\r
+ _mqttOperation_match,\r
+ ¶m );\r
+\r
+ /* Check if a match was found. */\r
+ if( pResultLink != NULL )\r
+ {\r
+ /* Get operation pointer and check if it is waitable. */\r
+ pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );\r
+ waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
+\r
+ /* Check if the matched operation is a PUBLISH with retry. If it is, cancel\r
+ * the retry job. */\r
+ if( pResult->u.operation.retry.limit > 0 )\r
+ {\r
+ taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
+ pResult->job,\r
+ NULL );\r
+\r
+ /* If the retry job could not be canceled, then it is currently\r
+ * executing. Ignore the operation. */\r
+ if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ pResult = NULL;\r
+ }\r
+ else\r
+ {\r
+ /* Check job reference counts. A waitable operation should have a\r
+ * count of 2; a non-waitable operation should have a count of 1. */\r
+ IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* An operation with no retry in the pending responses list should\r
+ * always have a job reference of 1. */\r
+ IotMqtt_Assert( pResult->u.operation.jobReference == 1 );\r
+\r
+ /* Increment job references of a waitable operation to prevent Wait from\r
+ * destroying this operation if it times out. */\r
+ if( waitable == true )\r
+ {\r
+ ( pResult->u.operation.jobReference )++;\r
+\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( type ),\r
+ pResult,\r
+ ( long int ) ( pResult->u.operation.jobReference - 1 ),\r
+ ( long int ) ( pResult->u.operation.jobReference ) );\r
+ }\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( pResult != NULL )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Found operation %s." ,\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( type ) );\r
+\r
+ /* Remove the matched operation from the list. */\r
+ IotListDouble_Remove( &( pResult->link ) );\r
+ }\r
+ else\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Operation %s not found.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( type ) );\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ return pResult;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_Notify( _mqttOperation_t * pOperation )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;\r
+ _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
+\r
+ /* Check if operation is waitable. */\r
+ bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
+\r
+ /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected\r
+ * subscriptions are removed by the deserializer, so not removed here. */\r
+ if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
+ {\r
+ switch( pOperation->u.operation.status )\r
+ {\r
+ case IOT_MQTT_SUCCESS:\r
+ break;\r
+\r
+ case IOT_MQTT_SERVER_REFUSED:\r
+ break;\r
+\r
+ default:\r
+ _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,\r
+ pOperation->u.operation.packetIdentifier,\r
+ -1 );\r
+ break;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Schedule callback invocation for non-waitable operation. */\r
+ if( waitable == false )\r
+ {\r
+ /* Non-waitable operation should have job reference of 1. */\r
+ IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );\r
+\r
+ /* Schedule an invocation of the callback. */\r
+ if( pOperation->u.operation.notify.callback.function != NULL )\r
+ {\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ status = _IotMqtt_ScheduleOperation( pOperation,\r
+ _IotMqtt_ProcessCompletedOperation,\r
+ 0 );\r
+\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",\r
+ pOperation->pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+\r
+ /* Place the scheduled operation back in the list of operations pending\r
+ * processing. */\r
+ if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
+ {\r
+ IotListDouble_Remove( &( pOperation->link ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
+ &( pOperation->link ) );\r
+ }\r
+ else\r
+ {\r
+ IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",\r
+ pOperation->pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Operations that weren't scheduled may be destroyed. */\r
+ if( status == IOT_MQTT_SCHEDULING_ERROR )\r
+ {\r
+ /* Decrement reference count of operations not scheduled. */\r
+ if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
+ {\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Post to a waitable operation's semaphore. */\r
+ if( waitable == true )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "\r
+ "notified of completion.",\r
+ pOperation->pMqttConnection,\r
+ IotMqtt_OperationType( pOperation->u.operation.type ),\r
+ pOperation );\r
+\r
+ IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ IotMqtt_Assert( status == IOT_MQTT_SUCCESS );\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r