--- /dev/null
+/*\r
+ * Amazon FreeRTOS MQTT V2.0.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://aws.amazon.com/freertos\r
+ * http://www.FreeRTOS.org\r
+ */\r
+\r
+/**\r
+ * @file iot_mqtt_api.c\r
+ * @brief Implements most user-facing functions of the MQTT library.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <string.h>\r
+\r
+/* Error handling include. */\r
+#include "private/iot_error.h"\r
+\r
+/* MQTT internal include. */\r
+#include "private/iot_mqtt_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_clock.h"\r
+#include "platform/iot_threads.h"\r
+\r
+/* Validate MQTT configuration settings. */\r
+#if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1\r
+ #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."\r
+#endif\r
+#if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1\r
+ #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."\r
+#endif\r
+#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1\r
+ #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."\r
+#endif\r
+#if IOT_MQTT_RESPONSE_WAIT_MS <= 0\r
+ #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."\r
+#endif\r
+#if IOT_MQTT_RETRY_MS_CEILING <= 0\r
+ #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."\r
+#endif\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Set the unsubscribed flag of an MQTT subscription.\r
+ *\r
+ * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
+ * @param[in] pMatch Not used.\r
+ *\r
+ * @return Always returns `true`.\r
+ */\r
+static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
+ void * pMatch );\r
+\r
+/**\r
+ * @brief Destroy an MQTT subscription if its reference count is 0.\r
+ *\r
+ * @param[in] pData The subscription to destroy. This parameter is of type\r
+ * `void*` for compatibility with [free]\r
+ * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
+ */\r
+static void _mqttSubscription_tryDestroy( void * pData );\r
+\r
+/**\r
+ * @brief Decrement the reference count of an MQTT operation and attempt to\r
+ * destroy it.\r
+ *\r
+ * @param[in] pData The operation data to destroy. This parameter is of type\r
+ * `void*` for compatibility with [free]\r
+ * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
+ */\r
+static void _mqttOperation_tryDestroy( void * pData );\r
+\r
+/**\r
+ * @brief Create a keep-alive job for an MQTT connection.\r
+ *\r
+ * @param[in] pNetworkInfo User-provided network information for the new\r
+ * connection.\r
+ * @param[in] keepAliveSeconds User-provided keep-alive interval.\r
+ * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.\r
+ *\r
+ * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
+ */\r
+static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds,\r
+ _mqttConnection_t * pMqttConnection );\r
+\r
+/**\r
+ * @brief Creates a new MQTT connection and initializes its members.\r
+ *\r
+ * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.\r
+ * @param[in] pNetworkInfo User-provided network information for the new\r
+ * connection.\r
+ * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.\r
+ *\r
+ * @return Pointer to a newly-created MQTT connection; `NULL` on failure.\r
+ */\r
+static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
+ const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds );\r
+\r
+/**\r
+ * @brief Destroys the members of an MQTT connection.\r
+ *\r
+ * @param[in] pMqttConnection Which connection to destroy.\r
+ */\r
+static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );\r
+\r
+/**\r
+ * @brief The common component of both @ref mqtt_function_subscribe and @ref\r
+ * mqtt_function_unsubscribe.\r
+ *\r
+ * See @ref mqtt_function_subscribe or @ref mqtt_function_unsubscribe for a\r
+ * description of the parameters and return values.\r
+ */\r
+static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
+ IotMqttConnection_t mqttConnection,\r
+ const IotMqttSubscription_t * pSubscriptionList,\r
+ size_t subscriptionCount,\r
+ uint32_t flags,\r
+ const IotMqttCallbackInfo_t * pCallbackInfo,\r
+ IotMqttOperation_t * pOperationReference );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
+ void * pMatch )\r
+{\r
+ /* Because this function is called from a container function, the given link\r
+ * must never be NULL. */\r
+ IotMqtt_Assert( pSubscriptionLink != NULL );\r
+\r
+ _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
+ pSubscriptionLink,\r
+ link );\r
+\r
+ /* Silence warnings about unused parameters. */\r
+ ( void ) pMatch;\r
+\r
+ /* Set the unsubscribed flag. */\r
+ pSubscription->unsubscribed = true;\r
+\r
+ return true;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _mqttSubscription_tryDestroy( void * pData )\r
+{\r
+ _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;\r
+\r
+ /* Reference count must not be negative. */\r
+ IotMqtt_Assert( pSubscription->references >= 0 );\r
+\r
+ /* Unsubscribed flag should be set. */\r
+ IotMqtt_Assert( pSubscription->unsubscribed == true );\r
+\r
+ /* Free the subscription if it has no references. */\r
+ if( pSubscription->references == 0 )\r
+ {\r
+ IotMqtt_FreeSubscription( pSubscription );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _mqttOperation_tryDestroy( void * pData )\r
+{\r
+ _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;\r
+ IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+\r
+ /* Incoming PUBLISH operations may always be freed. */\r
+ if( pOperation->incomingPublish == true )\r
+ {\r
+ /* Cancel the incoming PUBLISH operation's job. */\r
+ taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
+ pOperation->job,\r
+ NULL );\r
+\r
+ /* If the operation's job was not canceled, it must be already executing.\r
+ * Any other return value is invalid. */\r
+ IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
+ ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
+\r
+ /* Check if the incoming PUBLISH job was canceled. */\r
+ if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ /* Job was canceled. Process incoming PUBLISH now to clean up. */\r
+ _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,\r
+ pOperation->job,\r
+ pOperation );\r
+ }\r
+ else\r
+ {\r
+ /* The executing job will process the PUBLISH, so nothing is done here. */\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* Decrement reference count and destroy operation if possible. */\r
+ if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )\r
+ {\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds,\r
+ _mqttConnection_t * pMqttConnection )\r
+{\r
+ bool status = true;\r
+ IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
+ IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;\r
+\r
+ /* Network information is not used when MQTT packet serializers are disabled. */\r
+ ( void ) pNetworkInfo;\r
+\r
+ /* Default PINGREQ serializer function. */\r
+ IotMqttError_t ( * serializePingreq )( uint8_t **,\r
+ size_t * ) = _IotMqtt_SerializePingreq;\r
+\r
+ /* Convert the keep-alive interval to milliseconds. */\r
+ pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
+ pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+\r
+ /* Choose a PINGREQ serializer function. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pNetworkInfo->pMqttSerializer != NULL )\r
+ {\r
+ if( pNetworkInfo->pMqttSerializer->serialize.pingreq != NULL )\r
+ {\r
+ serializePingreq = pNetworkInfo->pMqttSerializer->serialize.pingreq;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Generate a PINGREQ packet. */\r
+ serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
+ &( pMqttConnection->pingreqPacketSize ) );\r
+\r
+ if( serializeStatus != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to allocate PINGREQ packet for new connection." );\r
+\r
+ status = false;\r
+ }\r
+ else\r
+ {\r
+ /* Create the task pool job that processes keep-alive. */\r
+ jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
+ pMqttConnection,\r
+ &( pMqttConnection->keepAliveJobStorage ),\r
+ &( pMqttConnection->keepAliveJob ) );\r
+\r
+ /* Task pool job creation for a pre-allocated job should never fail.\r
+ * Abort the program if it does. */\r
+ if( jobStatus != IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to create keep-alive job for new connection." );\r
+\r
+ IotMqtt_Assert( false );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Keep-alive references its MQTT connection, so increment reference. */\r
+ ( pMqttConnection->references )++;\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
+ const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds )\r
+{\r
+ IOT_FUNCTION_ENTRY( bool, true );\r
+ _mqttConnection_t * pMqttConnection = NULL;\r
+ bool referencesMutexCreated = false, subscriptionMutexCreated = false;\r
+\r
+ /* Allocate memory for the new MQTT connection. */\r
+ pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );\r
+\r
+ if( pMqttConnection == NULL )\r
+ {\r
+ IotLogError( "Failed to allocate memory for new connection." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( false );\r
+ }\r
+ else\r
+ {\r
+ /* Clear the MQTT connection, then copy the MQTT server mode, network\r
+ * interface, and disconnect callback. */\r
+ ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );\r
+ pMqttConnection->awsIotMqttMode = awsIotMqttMode;\r
+ pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;\r
+ pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;\r
+\r
+ /* Start a new MQTT connection with a reference count of 1. */\r
+ pMqttConnection->references = 1;\r
+ }\r
+\r
+ /* Create the references mutex for a new connection. It is a recursive mutex. */\r
+ referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );\r
+\r
+ if( referencesMutexCreated == false )\r
+ {\r
+ IotLogError( "Failed to create references mutex for new connection." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( false );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Create the subscription mutex for a new connection. */\r
+ subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );\r
+\r
+ if( subscriptionMutexCreated == false )\r
+ {\r
+ IotLogError( "Failed to create subscription mutex for new connection." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( false );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Create the new connection's subscription and operation lists. */\r
+ IotListDouble_Create( &( pMqttConnection->subscriptionList ) );\r
+ IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );\r
+ IotListDouble_Create( &( pMqttConnection->pendingResponse ) );\r
+\r
+ /* AWS IoT service limits set minimum and maximum values for keep-alive interval.\r
+ * Adjust the user-provided keep-alive interval based on these requirements. */\r
+ if( awsIotMqttMode == true )\r
+ {\r
+ if( keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )\r
+ {\r
+ keepAliveSeconds = AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE;\r
+ }\r
+ else if( keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )\r
+ {\r
+ keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
+ }\r
+ else if( keepAliveSeconds == 0 )\r
+ {\r
+ keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check if keep-alive is active for this connection. */\r
+ if( keepAliveSeconds != 0 )\r
+ {\r
+ if( _createKeepAliveJob( pNetworkInfo,\r
+ keepAliveSeconds,\r
+ pMqttConnection ) == false )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( false );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Clean up mutexes and connection if this function failed. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ if( status == false )\r
+ {\r
+ if( subscriptionMutexCreated == true )\r
+ {\r
+ IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( referencesMutexCreated == true )\r
+ {\r
+ IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( pMqttConnection != NULL )\r
+ {\r
+ IotMqtt_FreeConnection( pMqttConnection );\r
+ pMqttConnection = NULL;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return pMqttConnection;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )\r
+{\r
+ IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
+\r
+ /* Clean up keep-alive if still allocated. */\r
+ if( pMqttConnection->keepAliveMs != 0 )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
+\r
+ _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+\r
+ /* Clear data about the keep-alive. */\r
+ pMqttConnection->keepAliveMs = 0;\r
+ pMqttConnection->pPingreqPacket = NULL;\r
+ pMqttConnection->pingreqPacketSize = 0;\r
+\r
+ /* Decrement reference count. */\r
+ pMqttConnection->references--;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* A connection to be destroyed should have no keep-alive and at most 1\r
+ * reference. */\r
+ IotMqtt_Assert( pMqttConnection->references <= 1 );\r
+ IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
+ IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
+ IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
+\r
+ /* Remove all subscriptions. */\r
+ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+ IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
+ _mqttSubscription_setUnsubscribe,\r
+ NULL,\r
+ _mqttSubscription_tryDestroy,\r
+ offsetof( _mqttSubscription_t, link ) );\r
+ IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+ /* Destroy an owned network connection. */\r
+ if( pMqttConnection->ownNetworkConnection == true )\r
+ {\r
+ networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );\r
+\r
+ if( networkStatus != IOT_NETWORK_SUCCESS )\r
+ {\r
+ IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",\r
+ pMqttConnection );\r
+ }\r
+ else\r
+ {\r
+ IotLogInfo( "(MQTT connection %p) Network connection destroyed.",\r
+ pMqttConnection );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Destroy mutexes. */\r
+ IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
+ IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+ IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );\r
+\r
+ /* Free connection. */\r
+ IotMqtt_FreeConnection( pMqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
+ IotMqttConnection_t mqttConnection,\r
+ const IotMqttSubscription_t * pSubscriptionList,\r
+ size_t subscriptionCount,\r
+ uint32_t flags,\r
+ const IotMqttCallbackInfo_t * pCallbackInfo,\r
+ IotMqttOperation_t * pOperationReference )\r
+{\r
+ IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+ _mqttOperation_t * pSubscriptionOperation = NULL;\r
+\r
+ /* Subscription serializer function. */\r
+ IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,\r
+ size_t,\r
+ uint8_t **,\r
+ size_t *,\r
+ uint16_t * ) = NULL;\r
+\r
+ /* This function should only be called for subscribe or unsubscribe. */\r
+ IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||\r
+ ( operation == IOT_MQTT_UNSUBSCRIBE ) );\r
+\r
+ /* Check that all elements in the subscription list are valid. */\r
+ if( _IotMqtt_ValidateSubscriptionList( operation,\r
+ mqttConnection->awsIotMqttMode,\r
+ pSubscriptionList,\r
+ subscriptionCount ) == false )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check that a reference pointer is provided for a waitable operation. */\r
+ if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
+ {\r
+ if( pOperationReference == NULL )\r
+ {\r
+ IotLogError( "Reference must be provided for a waitable %s.",\r
+ IotMqtt_OperationType( operation ) );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Choose a subscription serialize function. */\r
+ if( operation == IOT_MQTT_SUBSCRIBE )\r
+ {\r
+ serializeSubscription = _IotMqtt_SerializeSubscribe;\r
+\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( mqttConnection->pSerializer != NULL )\r
+ {\r
+ if( mqttConnection->pSerializer->serialize.subscribe != NULL )\r
+ {\r
+ serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+ }\r
+ else\r
+ {\r
+ serializeSubscription = _IotMqtt_SerializeUnsubscribe;\r
+\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( mqttConnection->pSerializer != NULL )\r
+ {\r
+ if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )\r
+ {\r
+ serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+ }\r
+\r
+ /* Remove the MQTT subscription list for an UNSUBSCRIBE. */\r
+ if( operation == IOT_MQTT_UNSUBSCRIBE )\r
+ {\r
+ _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,\r
+ pSubscriptionList,\r
+ subscriptionCount );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Create a subscription operation. */\r
+ status = _IotMqtt_CreateOperation( mqttConnection,\r
+ flags,\r
+ pCallbackInfo,\r
+ &pSubscriptionOperation );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Check the subscription operation data and set the operation type. */\r
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
+ pSubscriptionOperation->u.operation.type = operation;\r
+\r
+ /* Generate a subscription packet from the subscription list. */\r
+ status = serializeSubscription( pSubscriptionList,\r
+ subscriptionCount,\r
+ &( pSubscriptionOperation->u.operation.pMqttPacket ),\r
+ &( pSubscriptionOperation->u.operation.packetSize ),\r
+ &( pSubscriptionOperation->u.operation.packetIdentifier ) );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Check the serialized MQTT packet. */\r
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );\r
+\r
+ /* Add the subscription list for a SUBSCRIBE. */\r
+ if( operation == IOT_MQTT_SUBSCRIBE )\r
+ {\r
+ status = _IotMqtt_AddSubscriptions( mqttConnection,\r
+ pSubscriptionOperation->u.operation.packetIdentifier,\r
+ pSubscriptionList,\r
+ subscriptionCount );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ }\r
+\r
+ /* Set the reference, if provided. */\r
+ if( pOperationReference != NULL )\r
+ {\r
+ *pOperationReference = pSubscriptionOperation;\r
+ }\r
+\r
+ /* Schedule the subscription operation for network transmission. */\r
+ status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,\r
+ _IotMqtt_ProcessSend,\r
+ 0 );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",\r
+ mqttConnection,\r
+ IotMqtt_OperationType( operation ) );\r
+\r
+ if( operation == IOT_MQTT_SUBSCRIBE )\r
+ {\r
+ _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,\r
+ pSubscriptionOperation->u.operation.packetIdentifier,\r
+ -1 );\r
+ }\r
+\r
+ /* Clear the previously set (and now invalid) reference. */\r
+ if( pOperationReference != NULL )\r
+ {\r
+ *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;\r
+ }\r
+\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Clean up if this function failed. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ if( pSubscriptionOperation != NULL )\r
+ {\r
+ _IotMqtt_DestroyOperation( pSubscriptionOperation );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ status = IOT_MQTT_STATUS_PENDING;\r
+\r
+ IotLogInfo( "(MQTT connection %p) %s operation scheduled.",\r
+ mqttConnection,\r
+ IotMqtt_OperationType( operation ) );\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
+{\r
+ bool disconnected = false;\r
+\r
+ /* Lock the mutex protecting the reference count. */\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Reference count must not be negative. */\r
+ IotMqtt_Assert( pMqttConnection->references >= 0 );\r
+\r
+ /* Read connection status. */\r
+ disconnected = pMqttConnection->disconnected;\r
+\r
+ /* Increment the connection's reference count if it is not disconnected. */\r
+ if( disconnected == false )\r
+ {\r
+ ( pMqttConnection->references )++;\r
+ IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
+ pMqttConnection,\r
+ ( long int ) pMqttConnection->references - 1,\r
+ ( long int ) pMqttConnection->references );\r
+ }\r
+ else\r
+ {\r
+ IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ return( disconnected == false );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
+{\r
+ bool destroyConnection = false;\r
+\r
+ /* Lock the mutex protecting the reference count. */\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Decrement reference count. It must not be negative. */\r
+ ( pMqttConnection->references )--;\r
+ IotMqtt_Assert( pMqttConnection->references >= 0 );\r
+\r
+ IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
+ pMqttConnection,\r
+ ( long int ) pMqttConnection->references + 1,\r
+ ( long int ) pMqttConnection->references );\r
+\r
+ /* Check if this connection may be destroyed. */\r
+ if( pMqttConnection->references == 0 )\r
+ {\r
+ destroyConnection = true;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Destroy an unreferenced MQTT connection. */\r
+ if( destroyConnection == true )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",\r
+ pMqttConnection );\r
+ _destroyMqttConnection( pMqttConnection );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Init( void )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+\r
+ /* Call any additional serializer initialization function if serializer\r
+ * overrides are enabled. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ #ifdef _IotMqtt_InitSerializeAdditional\r
+ if( _IotMqtt_InitSerializeAdditional() == false )\r
+ {\r
+ status = IOT_MQTT_INIT_FAILED;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Log initialization status. */\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to initialize MQTT library serializer. " );\r
+ }\r
+ else\r
+ {\r
+ IotLogInfo( "MQTT library successfully initialized." );\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void IotMqtt_Cleanup( void )\r
+{\r
+ /* Call any additional serializer cleanup initialization function if serializer\r
+ * overrides are enabled. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ #ifdef _IotMqtt_CleanupSerializeAdditional\r
+ _IotMqtt_CleanupSerializeAdditional();\r
+ #endif\r
+ #endif\r
+\r
+ IotLogInfo( "MQTT library cleanup done." );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ const IotMqttConnectInfo_t * pConnectInfo,\r
+ uint32_t timeoutMs,\r
+ IotMqttConnection_t * const pMqttConnection )\r
+{\r
+ IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+ bool networkCreated = false, ownNetworkConnection = false;\r
+ IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
+ IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
+ void * pNetworkConnection = NULL;\r
+ _mqttOperation_t * pOperation = NULL;\r
+ _mqttConnection_t * pNewMqttConnection = NULL;\r
+\r
+ /* Default CONNECT serializer function. */\r
+ IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,\r
+ uint8_t **,\r
+ size_t * ) = _IotMqtt_SerializeConnect;\r
+\r
+ /* Network info must not be NULL. */\r
+ if( pNetworkInfo == NULL )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Validate network interface and connect info. */\r
+ if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* If will info is provided, check that it is valid. */\r
+ if( pConnectInfo->pWillInfo != NULL )\r
+ {\r
+ if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,\r
+ pConnectInfo->pWillInfo ) == false )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )\r
+ {\r
+ /* Will message payloads cannot be larger than 65535. This restriction\r
+ * applies only to will messages, and not normal PUBLISH messages. */\r
+ IotLogError( "Will payload cannot be larger than 65535." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* If previous subscriptions are provided, check that they are valid. */\r
+ if( pConnectInfo->cleanSession == false )\r
+ {\r
+ if( pConnectInfo->pPreviousSubscriptions != NULL )\r
+ {\r
+ if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,\r
+ pConnectInfo->awsIotMqttMode,\r
+ pConnectInfo->pPreviousSubscriptions,\r
+ pConnectInfo->previousSubscriptionCount ) == false )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Create a new MQTT connection if requested. Otherwise, copy the existing\r
+ * network connection. */\r
+ if( pNetworkInfo->createNetworkConnection == true )\r
+ {\r
+ networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,\r
+ pNetworkInfo->u.setup.pNetworkCredentialInfo,\r
+ &pNetworkConnection );\r
+\r
+ if( networkStatus == IOT_NETWORK_SUCCESS )\r
+ {\r
+ networkCreated = true;\r
+\r
+ /* This MQTT connection owns the network connection it created and\r
+ * should destroy it on cleanup. */\r
+ ownNetworkConnection = true;\r
+ }\r
+ else\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ pNetworkConnection = pNetworkInfo->u.pNetworkConnection;\r
+ networkCreated = true;\r
+ }\r
+\r
+ IotLogInfo( "Establishing new MQTT connection." );\r
+\r
+ /* Initialize a new MQTT connection object. */\r
+ pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
+ pNetworkInfo,\r
+ pConnectInfo->keepAliveSeconds );\r
+\r
+ if( pNewMqttConnection == NULL )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
+ }\r
+ else\r
+ {\r
+ /* Set the network connection associated with the MQTT connection. */\r
+ pNewMqttConnection->pNetworkConnection = pNetworkConnection;\r
+ pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;\r
+\r
+ /* Set the MQTT packet serializer overrides. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;\r
+ #endif\r
+ }\r
+\r
+ /* Set the MQTT receive callback. */\r
+ networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,\r
+ IotMqtt_ReceiveCallback,\r
+ pNewMqttConnection );\r
+\r
+ if( networkStatus != IOT_NETWORK_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to set MQTT network receive callback." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Create a CONNECT operation. */\r
+ status = _IotMqtt_CreateOperation( pNewMqttConnection,\r
+ IOT_MQTT_FLAG_WAITABLE,\r
+ NULL,\r
+ &pOperation );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Ensure the members set by operation creation and serialization\r
+ * are appropriate for a blocking CONNECT. */\r
+ IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+ IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
+ == IOT_MQTT_FLAG_WAITABLE );\r
+ IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+\r
+ /* Set the operation type. */\r
+ pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
+\r
+ /* Add previous session subscriptions. */\r
+ if( pConnectInfo->pPreviousSubscriptions != NULL )\r
+ {\r
+ /* Previous subscription count should have been validated as nonzero. */\r
+ IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );\r
+\r
+ status = _IotMqtt_AddSubscriptions( pNewMqttConnection,\r
+ 2,\r
+ pConnectInfo->pPreviousSubscriptions,\r
+ pConnectInfo->previousSubscriptionCount );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Choose a CONNECT serializer function. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pNewMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pNewMqttConnection->pSerializer->serialize.connect != NULL )\r
+ {\r
+ serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Convert the connect info and will info objects to an MQTT CONNECT packet. */\r
+ status = serializeConnect( pConnectInfo,\r
+ &( pOperation->u.operation.pMqttPacket ),\r
+ &( pOperation->u.operation.packetSize ) );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check the serialized MQTT packet. */\r
+ IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
+\r
+ /* Add the CONNECT operation to the send queue for network transmission. */\r
+ status = _IotMqtt_ScheduleOperation( pOperation,\r
+ _IotMqtt_ProcessSend,\r
+ 0 );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to enqueue CONNECT for sending." );\r
+ }\r
+ else\r
+ {\r
+ /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */\r
+ status = IotMqtt_Wait( pOperation,\r
+ timeoutMs );\r
+\r
+ /* The call to wait cleans up the CONNECT operation, so set the pointer\r
+ * to NULL. */\r
+ pOperation = NULL;\r
+ }\r
+\r
+ /* When a connection is successfully established, schedule keep-alive job. */\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ /* Check if a keep-alive job should be scheduled. */\r
+ if( pNewMqttConnection->keepAliveMs != 0 )\r
+ {\r
+ IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
+\r
+ taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
+ pNewMqttConnection->keepAliveJob,\r
+ pNewMqttConnection->nextKeepAliveMs );\r
+\r
+ if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "Failed to establish new MQTT connection, error %s.",\r
+ IotMqtt_strerror( status ) );\r
+\r
+ /* The network connection must be closed if it was created. */\r
+ if( networkCreated == true )\r
+ {\r
+ networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );\r
+\r
+ if( networkStatus != IOT_NETWORK_SUCCESS )\r
+ {\r
+ IotLogWarn( "Failed to close network connection." );\r
+ }\r
+ else\r
+ {\r
+ IotLogInfo( "Network connection closed on error." );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( pOperation != NULL )\r
+ {\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( pNewMqttConnection != NULL )\r
+ {\r
+ _destroyMqttConnection( pNewMqttConnection );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ IotLogInfo( "New MQTT connection %p established.", pMqttConnection );\r
+\r
+ /* Set the output parameter. */\r
+ *pMqttConnection = pNewMqttConnection;\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,\r
+ uint32_t flags )\r
+{\r
+ bool disconnected = false;\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ _mqttOperation_t * pOperation = NULL;\r
+\r
+ IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );\r
+\r
+ /* Read the connection status. */\r
+ IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
+ disconnected = mqttConnection->disconnected;\r
+ IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
+\r
+ /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"\r
+ * flag is not set. */\r
+ if( disconnected == false )\r
+ {\r
+ if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == 0 )\r
+ {\r
+ /* Create a DISCONNECT operation. This function blocks until the DISCONNECT\r
+ * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */\r
+ status = _IotMqtt_CreateOperation( mqttConnection,\r
+ IOT_MQTT_FLAG_WAITABLE,\r
+ NULL,\r
+ &pOperation );\r
+\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ /* Ensure that the members set by operation creation and serialization\r
+ * are appropriate for a blocking DISCONNECT. */\r
+ IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+ IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
+ == IOT_MQTT_FLAG_WAITABLE );\r
+ IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+\r
+ /* Set the operation type. */\r
+ pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
+\r
+ /* Choose a disconnect serializer. */\r
+ IotMqttError_t ( * serializeDisconnect )( uint8_t **,\r
+ size_t * ) = _IotMqtt_SerializeDisconnect;\r
+\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( mqttConnection->pSerializer != NULL )\r
+ {\r
+ if( mqttConnection->pSerializer->serialize.disconnect != NULL )\r
+ {\r
+ serializeDisconnect = mqttConnection->pSerializer->serialize.disconnect;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Generate a DISCONNECT packet. */\r
+ status = serializeDisconnect( &( pOperation->u.operation.pMqttPacket ),\r
+ &( pOperation->u.operation.packetSize ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ /* Check the serialized MQTT packet. */\r
+ IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
+\r
+ /* Schedule the DISCONNECT operation for network transmission. */\r
+ if( _IotMqtt_ScheduleOperation( pOperation,\r
+ _IotMqtt_ProcessSend,\r
+ 0 ) != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogWarn( "(MQTT connection %p) Failed to schedule DISCONNECT for sending.",\r
+ mqttConnection );\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ /* Wait a short time for the DISCONNECT packet to be transmitted. */\r
+ status = IotMqtt_Wait( pOperation,\r
+ IOT_MQTT_RESPONSE_WAIT_MS );\r
+\r
+ /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,\r
+ * or NETWORK ERROR. */\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );\r
+ }\r
+ else\r
+ {\r
+ IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||\r
+ ( status == IOT_MQTT_NETWORK_ERROR ) );\r
+\r
+ IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",\r
+ mqttConnection,\r
+ IotMqtt_strerror( status ) );\r
+ }\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Close the underlying network connection. This also cleans up keep-alive. */\r
+ _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,\r
+ mqttConnection );\r
+\r
+ /* Check if the connection may be destroyed. */\r
+ IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
+\r
+ /* At this point, the connection should be marked disconnected. */\r
+ IotMqtt_Assert( mqttConnection->disconnected == true );\r
+\r
+ /* Attempt cancel and destroy each operation in the connection's lists. */\r
+ IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),\r
+ _mqttOperation_tryDestroy,\r
+ offsetof( _mqttOperation_t, link ) );\r
+\r
+ IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),\r
+ _mqttOperation_tryDestroy,\r
+ offsetof( _mqttOperation_t, link ) );\r
+\r
+ IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
+\r
+ /* Decrement the connection reference count and destroy it if possible. */\r
+ _IotMqtt_DecrementConnectionReferences( mqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,\r
+ const IotMqttSubscription_t * pSubscriptionList,\r
+ size_t subscriptionCount,\r
+ uint32_t flags,\r
+ const IotMqttCallbackInfo_t * pCallbackInfo,\r
+ IotMqttOperation_t * pSubscribeOperation )\r
+{\r
+ return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
+ mqttConnection,\r
+ pSubscriptionList,\r
+ subscriptionCount,\r
+ flags,\r
+ pCallbackInfo,\r
+ pSubscribeOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_TimedSubscribe( IotMqttConnection_t mqttConnection,\r
+ const IotMqttSubscription_t * pSubscriptionList,\r
+ size_t subscriptionCount,\r
+ uint32_t flags,\r
+ uint32_t timeoutMs )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
+\r
+ /* Flags are not used, but the parameter is present for future compatibility. */\r
+ ( void ) flags;\r
+\r
+ /* Call the asynchronous SUBSCRIBE function. */\r
+ status = IotMqtt_Subscribe( mqttConnection,\r
+ pSubscriptionList,\r
+ subscriptionCount,\r
+ IOT_MQTT_FLAG_WAITABLE,\r
+ NULL,\r
+ &subscribeOperation );\r
+\r
+ /* Wait for the SUBSCRIBE operation to complete. */\r
+ if( status == IOT_MQTT_STATUS_PENDING )\r
+ {\r
+ status = IotMqtt_Wait( subscribeOperation, timeoutMs );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Ensure that a status was set. */\r
+ IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,\r
+ const IotMqttSubscription_t * pSubscriptionList,\r
+ size_t subscriptionCount,\r
+ uint32_t flags,\r
+ const IotMqttCallbackInfo_t * pCallbackInfo,\r
+ IotMqttOperation_t * pUnsubscribeOperation )\r
+{\r
+ return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
+ mqttConnection,\r
+ pSubscriptionList,\r
+ subscriptionCount,\r
+ flags,\r
+ pCallbackInfo,\r
+ pUnsubscribeOperation );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_TimedUnsubscribe( IotMqttConnection_t mqttConnection,\r
+ const IotMqttSubscription_t * pSubscriptionList,\r
+ size_t subscriptionCount,\r
+ uint32_t flags,\r
+ uint32_t timeoutMs )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
+\r
+ /* Flags are not used, but the parameter is present for future compatibility. */\r
+ ( void ) flags;\r
+\r
+ /* Call the asynchronous UNSUBSCRIBE function. */\r
+ status = IotMqtt_Unsubscribe( mqttConnection,\r
+ pSubscriptionList,\r
+ subscriptionCount,\r
+ IOT_MQTT_FLAG_WAITABLE,\r
+ NULL,\r
+ &unsubscribeOperation );\r
+\r
+ /* Wait for the UNSUBSCRIBE operation to complete. */\r
+ if( status == IOT_MQTT_STATUS_PENDING )\r
+ {\r
+ status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Ensure that a status was set. */\r
+ IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,\r
+ const IotMqttPublishInfo_t * pPublishInfo,\r
+ uint32_t flags,\r
+ const IotMqttCallbackInfo_t * pCallbackInfo,\r
+ IotMqttOperation_t * pPublishOperation )\r
+{\r
+ IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
+ _mqttOperation_t * pOperation = NULL;\r
+ uint8_t ** pPacketIdentifierHigh = NULL;\r
+\r
+ /* Default PUBLISH serializer function. */\r
+ IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,\r
+ uint8_t **,\r
+ size_t *,\r
+ uint16_t *,\r
+ uint8_t ** ) = _IotMqtt_SerializePublish;\r
+\r
+ /* Check that the PUBLISH information is valid. */\r
+ if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,\r
+ pPublishInfo ) == false )\r
+ {\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check that no notification is requested for a QoS 0 publish. */\r
+ if( pPublishInfo->qos == IOT_MQTT_QOS_0 )\r
+ {\r
+ if( pCallbackInfo != NULL )\r
+ {\r
+ IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )\r
+ {\r
+ IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ if( pPublishOperation != NULL )\r
+ {\r
+ IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check that a reference pointer is provided for a waitable operation. */\r
+ if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
+ {\r
+ if( pPublishOperation == NULL )\r
+ {\r
+ IotLogError( "Reference must be provided for a waitable PUBLISH." );\r
+\r
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Create a PUBLISH operation. */\r
+ status = _IotMqtt_CreateOperation( mqttConnection,\r
+ flags,\r
+ pCallbackInfo,\r
+ &pOperation );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check the PUBLISH operation data and set the operation type. */\r
+ IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
+ pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;\r
+\r
+ /* Choose a PUBLISH serializer function. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( mqttConnection->pSerializer != NULL )\r
+ {\r
+ if( mqttConnection->pSerializer->serialize.publish != NULL )\r
+ {\r
+ serializePublish = mqttConnection->pSerializer->serialize.publish;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */\r
+ if( mqttConnection->awsIotMqttMode == true )\r
+ {\r
+ pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Generate a PUBLISH packet from pPublishInfo. */\r
+ status = serializePublish( pPublishInfo,\r
+ &( pOperation->u.operation.pMqttPacket ),\r
+ &( pOperation->u.operation.packetSize ),\r
+ &( pOperation->u.operation.packetIdentifier ),\r
+ pPacketIdentifierHigh );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check the serialized MQTT packet. */\r
+ IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
+\r
+ /* Initialize PUBLISH retry if retryLimit is set. */\r
+ if( pPublishInfo->retryLimit > 0 )\r
+ {\r
+ /* A QoS 0 PUBLISH may not be retried. */\r
+ if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
+ {\r
+ pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
+ pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Set the reference, if provided. */\r
+ if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
+ {\r
+ if( pPublishOperation != NULL )\r
+ {\r
+ *pPublishOperation = pOperation;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Add the PUBLISH operation to the send queue for network transmission. */\r
+ status = _IotMqtt_ScheduleOperation( pOperation,\r
+ _IotMqtt_ProcessSend,\r
+ 0 );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",\r
+ mqttConnection );\r
+\r
+ /* Clear the previously set (and now invalid) reference. */\r
+ if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
+ {\r
+ if( pPublishOperation != NULL )\r
+ {\r
+ *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Clean up the PUBLISH operation if this function fails. Otherwise, set the\r
+ * appropriate return code based on QoS. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ if( pOperation != NULL )\r
+ {\r
+ _IotMqtt_DestroyOperation( pOperation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ if( pPublishInfo->qos > IOT_MQTT_QOS_0 )\r
+ {\r
+ status = IOT_MQTT_STATUS_PENDING;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",\r
+ mqttConnection );\r
+ }\r
+\r
+ IOT_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_TimedPublish( IotMqttConnection_t mqttConnection,\r
+ const IotMqttPublishInfo_t * pPublishInfo,\r
+ uint32_t flags,\r
+ uint32_t timeoutMs )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,\r
+ * pPublishOperation = NULL;\r
+\r
+ /* Clear the flags. */\r
+ flags = 0;\r
+\r
+ /* Set the waitable flag and reference for QoS 1 PUBLISH. */\r
+ if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
+ {\r
+ flags = IOT_MQTT_FLAG_WAITABLE;\r
+ pPublishOperation = &publishOperation;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Call the asynchronous PUBLISH function. */\r
+ status = IotMqtt_Publish( mqttConnection,\r
+ pPublishInfo,\r
+ flags,\r
+ NULL,\r
+ pPublishOperation );\r
+\r
+ /* Wait for a queued QoS 1 PUBLISH to complete. */\r
+ if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
+ {\r
+ if( status == IOT_MQTT_STATUS_PENDING )\r
+ {\r
+ status = IotMqtt_Wait( publishOperation, timeoutMs );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,\r
+ uint32_t timeoutMs )\r
+{\r
+ IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+ _mqttConnection_t * pMqttConnection = operation->pMqttConnection;\r
+\r
+ /* Validate the given operation reference. */\r
+ if( _IotMqtt_ValidateOperation( operation ) == false )\r
+ {\r
+ status = IOT_MQTT_BAD_PARAMETER;\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Check the MQTT connection status. */\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ if( pMqttConnection->disconnected == true )\r
+ {\r
+ IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "\r
+ "Operation cannot be waited on.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( operation->u.operation.type ),\r
+ operation );\r
+\r
+ status = IOT_MQTT_NETWORK_ERROR;\r
+ }\r
+ else\r
+ {\r
+ IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( operation->u.operation.type ),\r
+ operation );\r
+ }\r
+\r
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
+\r
+ /* Only wait on an operation if the MQTT connection is active. */\r
+ if( status == IOT_MQTT_SUCCESS )\r
+ {\r
+ if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),\r
+ timeoutMs ) == false )\r
+ {\r
+ status = IOT_MQTT_TIMEOUT;\r
+\r
+ /* Attempt to cancel the job of the timed out operation. */\r
+ ( void ) _IotMqtt_DecrementOperationReferences( operation, true );\r
+\r
+ /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */\r
+ if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
+ {\r
+ IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"\r
+ " subscriptions of timed-out SUBSCRIBE.",\r
+ pMqttConnection,\r
+ operation );\r
+\r
+ _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,\r
+ operation->u.operation.packetIdentifier,\r
+ -1 );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* Retrieve the status of the completed operation. */\r
+ status = operation->u.operation.status;\r
+ }\r
+\r
+ IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",\r
+ pMqttConnection,\r
+ IotMqtt_OperationType( operation->u.operation.type ),\r
+ operation,\r
+ IotMqtt_strerror( status ) );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Wait is finished; decrement operation reference count. */\r
+ if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )\r
+ {\r
+ _IotMqtt_DestroyOperation( operation );\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+const char * IotMqtt_strerror( IotMqttError_t status )\r
+{\r
+ const char * pMessage = NULL;\r
+\r
+ switch( status )\r
+ {\r
+ case IOT_MQTT_SUCCESS:\r
+ pMessage = "SUCCESS";\r
+ break;\r
+\r
+ case IOT_MQTT_STATUS_PENDING:\r
+ pMessage = "PENDING";\r
+ break;\r
+\r
+ case IOT_MQTT_INIT_FAILED:\r
+ pMessage = "INITIALIZATION FAILED";\r
+ break;\r
+\r
+ case IOT_MQTT_BAD_PARAMETER:\r
+ pMessage = "BAD PARAMETER";\r
+ break;\r
+\r
+ case IOT_MQTT_NO_MEMORY:\r
+ pMessage = "NO MEMORY";\r
+ break;\r
+\r
+ case IOT_MQTT_NETWORK_ERROR:\r
+ pMessage = "NETWORK ERROR";\r
+ break;\r
+\r
+ case IOT_MQTT_SCHEDULING_ERROR:\r
+ pMessage = "SCHEDULING ERROR";\r
+ break;\r
+\r
+ case IOT_MQTT_BAD_RESPONSE:\r
+ pMessage = "BAD RESPONSE RECEIVED";\r
+ break;\r
+\r
+ case IOT_MQTT_TIMEOUT:\r
+ pMessage = "TIMEOUT";\r
+ break;\r
+\r
+ case IOT_MQTT_SERVER_REFUSED:\r
+ pMessage = "SERVER REFUSED";\r
+ break;\r
+\r
+ case IOT_MQTT_RETRY_NO_RESPONSE:\r
+ pMessage = "NO RESPONSE";\r
+ break;\r
+\r
+ default:\r
+ pMessage = "INVALID STATUS";\r
+ break;\r
+ }\r
+\r
+ return pMessage;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+const char * IotMqtt_OperationType( IotMqttOperationType_t operation )\r
+{\r
+ const char * pMessage = NULL;\r
+\r
+ switch( operation )\r
+ {\r
+ case IOT_MQTT_CONNECT:\r
+ pMessage = "CONNECT";\r
+ break;\r
+\r
+ case IOT_MQTT_PUBLISH_TO_SERVER:\r
+ pMessage = "PUBLISH";\r
+ break;\r
+\r
+ case IOT_MQTT_PUBACK:\r
+ pMessage = "PUBACK";\r
+ break;\r
+\r
+ case IOT_MQTT_SUBSCRIBE:\r
+ pMessage = "SUBSCRIBE";\r
+ break;\r
+\r
+ case IOT_MQTT_UNSUBSCRIBE:\r
+ pMessage = "UNSUBSCRIBE";\r
+ break;\r
+\r
+ case IOT_MQTT_PINGREQ:\r
+ pMessage = "PINGREQ";\r
+ break;\r
+\r
+ case IOT_MQTT_DISCONNECT:\r
+ pMessage = "DISCONNECT";\r
+ break;\r
+\r
+ default:\r
+ pMessage = "INVALID OPERATION";\r
+ break;\r
+ }\r
+\r
+ return pMessage;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/* Provide access to internal functions and variables if testing. */\r
+#if IOT_BUILD_TESTS == 1\r
+ #include "iot_test_access_mqtt_api.c"\r
+#endif\r