]> git.sur5r.net Git - freertos/blobdiff - FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
Add the Labs projects provided in the V10.2.1_191129 zip file.
[freertos] / FreeRTOS-Labs / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_subscription.c
diff --git a/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c b/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
new file mode 100644 (file)
index 0000000..00b241a
--- /dev/null
@@ -0,0 +1,645 @@
+/*\r
+ * IoT MQTT V2.1.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ */\r
+\r
+/**\r
+ * @file iot_mqtt_subscription.c\r
+ * @brief Implements functions that manage subscriptions for an MQTT connection.\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <stdbool.h>\r
+#include <string.h>\r
+\r
+/* Error handling include. */\r
+#include "iot_error.h"\r
+\r
+/* MQTT internal include. */\r
+#include "private/iot_mqtt_internal.h"\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_threads.h"\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief First parameter to #_topicMatch.\r
+ */\r
+typedef struct _topicMatchParams\r
+{\r
+    const char * pTopicName;  /**< @brief The topic name to parse. */\r
+    uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */\r
+    bool exactMatchOnly;      /**< @brief Whether to allow wildcards or require exact matches. */\r
+} _topicMatchParams_t;\r
+\r
+/**\r
+ * @brief First parameter to #_packetMatch.\r
+ */\r
+typedef struct _packetMatchParams\r
+{\r
+    uint16_t packetIdentifier; /**< Packet identifier to match. */\r
+    int32_t order;             /**< Order to match. Set to #MQTT_REMOVE_ALL_SUBSCRIPTIONS to ignore. */\r
+} _packetMatchParams_t;\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/**\r
+ * @brief Matches a topic name (from a publish) with a topic filter (from a\r
+ * subscription).\r
+ *\r
+ * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
+ * @param[in] pMatch Pointer to a #_topicMatchParams_t.\r
+ *\r
+ * @return `true` if the arguments match the subscription topic filter; `false`\r
+ * otherwise.\r
+ */\r
+static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
+                         void * pMatch );\r
+\r
+/**\r
+ * @brief Matches a packet identifier and order.\r
+ *\r
+ * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
+ * @param[in] pMatch Pointer to a #_packetMatchParams_t.\r
+ *\r
+ * @return `true` if the arguments match the subscription's packet info; `false`\r
+ * otherwise.\r
+ */\r
+static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
+                          void * pMatch );\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
+                         void * pMatch )\r
+{\r
+    IOT_FUNCTION_ENTRY( bool, false );\r
+    uint16_t nameIndex = 0, filterIndex = 0;\r
+\r
+    /* Because this function is called from a container function, the given link\r
+     * must never be NULL. */\r
+    IotMqtt_Assert( pSubscriptionLink != NULL );\r
+\r
+    _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
+                                                             pSubscriptionLink,\r
+                                                             link );\r
+    _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;\r
+\r
+    /* Extract the relevant strings and lengths from parameters. */\r
+    const char * pTopicName = pParam->pTopicName;\r
+    const char * pTopicFilter = pSubscription->pTopicFilter;\r
+    const uint16_t topicNameLength = pParam->topicNameLength;\r
+    const uint16_t topicFilterLength = pSubscription->topicFilterLength;\r
+\r
+    /* Check for an exact match. */\r
+    if( topicNameLength == topicFilterLength )\r
+    {\r
+        status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );\r
+\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* If the topic lengths are different but an exact match is required, return\r
+     * false. */\r
+    if( pParam->exactMatchOnly == true )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( false );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )\r
+    {\r
+        /* Check if the character in the topic name matches the corresponding\r
+         * character in the topic filter string. */\r
+        if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )\r
+        {\r
+            /* Handle special corner cases as documented by the MQTT protocol spec. */\r
+\r
+            /* Filter "sport/#" also matches "sport" since # includes the parent level. */\r
+            if( nameIndex == topicNameLength - 1 )\r
+            {\r
+                if( filterIndex == topicFilterLength - 3 )\r
+                {\r
+                    if( pTopicFilter[ filterIndex + 1 ] == '/' )\r
+                    {\r
+                        if( pTopicFilter[ filterIndex + 2 ] == '#' )\r
+                        {\r
+                            IOT_SET_AND_GOTO_CLEANUP( true );\r
+                        }\r
+                        else\r
+                        {\r
+                            EMPTY_ELSE_MARKER;\r
+                        }\r
+                    }\r
+                    else\r
+                    {\r
+                        EMPTY_ELSE_MARKER;\r
+                    }\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+\r
+            /* Filter "sport/+" also matches the "sport/" but not "sport". */\r
+            if( nameIndex == topicNameLength - 1 )\r
+            {\r
+                if( filterIndex == topicFilterLength - 2 )\r
+                {\r
+                    if( pTopicFilter[ filterIndex + 1 ] == '+' )\r
+                    {\r
+                        IOT_SET_AND_GOTO_CLEANUP( true );\r
+                    }\r
+                    else\r
+                    {\r
+                        EMPTY_ELSE_MARKER;\r
+                    }\r
+                }\r
+                else\r
+                {\r
+                    EMPTY_ELSE_MARKER;\r
+                }\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            /* Check for wildcards. */\r
+            if( pTopicFilter[ filterIndex ] == '+' )\r
+            {\r
+                /* Move topic name index to the end of the current level.\r
+                 * This is identified by '/'. */\r
+                while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )\r
+                {\r
+                    nameIndex++;\r
+                }\r
+\r
+                /* Increment filter index to skip '/'. */\r
+                filterIndex++;\r
+                continue;\r
+            }\r
+            else if( pTopicFilter[ filterIndex ] == '#' )\r
+            {\r
+                /* Subsequent characters don't need to be checked if the for the\r
+                 * multi-level wildcard. */\r
+                IOT_SET_AND_GOTO_CLEANUP( true );\r
+            }\r
+            else\r
+            {\r
+                /* Any character mismatch other than '+' or '#' means the topic\r
+                 * name does not match the topic filter. */\r
+                IOT_SET_AND_GOTO_CLEANUP( false );\r
+            }\r
+        }\r
+\r
+        /* Increment indexes. */\r
+        nameIndex++;\r
+        filterIndex++;\r
+    }\r
+\r
+    /* If the end of both strings has been reached, they match. */\r
+    if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )\r
+    {\r
+        IOT_SET_AND_GOTO_CLEANUP( true );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IOT_FUNCTION_EXIT_NO_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
+                          void * pMatch )\r
+{\r
+    bool match = false;\r
+\r
+    /* Because this function is called from a container function, the given link\r
+     * must never be NULL. */\r
+    IotMqtt_Assert( pSubscriptionLink != NULL );\r
+\r
+    _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
+                                                             pSubscriptionLink,\r
+                                                             link );\r
+    _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;\r
+\r
+    /* Compare packet identifiers. */\r
+    if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )\r
+    {\r
+        /* Compare orders if order is not MQTT_REMOVE_ALL_SUBSCRIPTIONS. */\r
+        if( pParam->order == MQTT_REMOVE_ALL_SUBSCRIPTIONS )\r
+        {\r
+            match = true;\r
+        }\r
+        else\r
+        {\r
+            match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;\r
+        }\r
+    }\r
+\r
+    /* If this subscription should be removed, check the reference count. */\r
+    if( match == true )\r
+    {\r
+        /* Reference count must not be negative. */\r
+        IotMqtt_Assert( pSubscription->references >= 0 );\r
+\r
+        /* If the reference count is positive, this subscription cannot be\r
+         * removed yet because there are subscription callbacks using it. */\r
+        if( pSubscription->references > 0 )\r
+        {\r
+            match = false;\r
+\r
+            /* Set the unsubscribed flag. The last active subscription callback\r
+             * will remove and clean up this subscription. */\r
+            pSubscription->unsubscribed = true;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return match;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,\r
+                                          uint16_t subscribePacketIdentifier,\r
+                                          const IotMqttSubscription_t * pSubscriptionList,\r
+                                          size_t subscriptionCount )\r
+{\r
+    IotMqttError_t status = IOT_MQTT_SUCCESS;\r
+    size_t i = 0;\r
+    _mqttSubscription_t * pNewSubscription = NULL;\r
+    IotLink_t * pSubscriptionLink = NULL;\r
+    _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };\r
+\r
+    IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    for( i = 0; i < subscriptionCount; i++ )\r
+    {\r
+        /* Check if this topic filter is already registered. */\r
+        topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
+        topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
+        pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
+                                                          NULL,\r
+                                                          _topicMatch,\r
+                                                          &topicMatchParams );\r
+\r
+        if( pSubscriptionLink != NULL )\r
+        {\r
+            pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
+\r
+            /* The lengths of exactly matching topic filters must match. */\r
+            IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );\r
+\r
+            /* Replace the callback and packet info with the new parameters. */\r
+            pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
+            pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
+            pNewSubscription->packetInfo.order = i;\r
+        }\r
+        else\r
+        {\r
+            /* Allocate memory for a new subscription. */\r
+            pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +\r
+                                                           pSubscriptionList[ i ].topicFilterLength );\r
+\r
+            if( pNewSubscription == NULL )\r
+            {\r
+                status = IOT_MQTT_NO_MEMORY;\r
+                break;\r
+            }\r
+            else\r
+            {\r
+                /* Clear the new subscription. */\r
+                ( void ) memset( pNewSubscription,\r
+                                 0x00,\r
+                                 sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );\r
+\r
+                /* Set the members of the new subscription and add it to the list. */\r
+                pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
+                pNewSubscription->packetInfo.order = i;\r
+                pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
+                pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;\r
+                ( void ) memcpy( pNewSubscription->pTopicFilter,\r
+                                 pSubscriptionList[ i ].pTopicFilter,\r
+                                 ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );\r
+\r
+                IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),\r
+                                          &( pNewSubscription->link ) );\r
+            }\r
+        }\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    /* If memory allocation failed, remove all previously added subscriptions. */\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,\r
+                                                  pSubscriptionList,\r
+                                                  i );\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,\r
+                                          IotMqttCallbackParam_t * pCallbackParam )\r
+{\r
+    _mqttSubscription_t * pSubscription = NULL;\r
+    IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;\r
+    void * pCallbackContext = NULL;\r
+\r
+    void ( * callbackFunction )( void *,\r
+                                 IotMqttCallbackParam_t * ) = NULL;\r
+    _topicMatchParams_t topicMatchParams = { 0 };\r
+\r
+    /* Set the members of the search parameter. */\r
+    topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;\r
+    topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;\r
+    topicMatchParams.exactMatchOnly = false;\r
+\r
+    /* Prevent any other thread from modifying the subscription list while this\r
+     * function is searching. */\r
+    IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    /* Search the subscription list for all matching subscriptions starting at\r
+     * the list head. */\r
+    while( true )\r
+    {\r
+        pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
+                                                     pCurrentLink,\r
+                                                     _topicMatch,\r
+                                                     &topicMatchParams );\r
+\r
+        /* No subscription found. Exit loop. */\r
+        if( pCurrentLink == NULL )\r
+        {\r
+            break;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        /* Subscription found. Calculate pointer to subscription object. */\r
+        pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );\r
+\r
+        /* Subscription validation should not have allowed a NULL callback function. */\r
+        IotMqtt_Assert( pSubscription->callback.function != NULL );\r
+\r
+        /* Increment the subscription's reference count. */\r
+        ( pSubscription->references )++;\r
+\r
+        /* Copy the necessary members of the subscription before releasing the\r
+         * subscription list mutex. */\r
+        pCallbackContext = pSubscription->callback.pCallbackContext;\r
+        callbackFunction = pSubscription->callback.function;\r
+\r
+        /* Unlock the subscription list mutex. */\r
+        IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+        /* Set the members of the callback parameter. */\r
+        pCallbackParam->mqttConnection = pMqttConnection;\r
+        pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;\r
+        pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;\r
+\r
+        /* Invoke the subscription callback. */\r
+        callbackFunction( pCallbackContext, pCallbackParam );\r
+\r
+        /* Lock the subscription list mutex to decrement the reference count. */\r
+        IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+        /* Decrement the reference count. It must still be positive. */\r
+        ( pSubscription->references )--;\r
+        IotMqtt_Assert( pSubscription->references >= 0 );\r
+\r
+        /* Save the pointer to the next link in case this subscription is freed. */\r
+        pNextLink = pCurrentLink->pNext;\r
+\r
+        /* Remove this subscription if it has no references and the unsubscribed\r
+         * flag is set. */\r
+        if( pSubscription->unsubscribed == true )\r
+        {\r
+            /* An unsubscribed subscription should have been removed from the list. */\r
+            IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );\r
+\r
+            /* Free subscriptions with no references. */\r
+            if( pSubscription->references == 0 )\r
+            {\r
+                IotMqtt_FreeSubscription( pSubscription );\r
+            }\r
+            else\r
+            {\r
+                EMPTY_ELSE_MARKER;\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        /* Move current link pointer. */\r
+        pCurrentLink = pNextLink;\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,\r
+                                          uint16_t packetIdentifier,\r
+                                          int32_t order )\r
+{\r
+    _packetMatchParams_t packetMatchParams = { 0 };\r
+\r
+    /* Set the members of the search parameter. */\r
+    packetMatchParams.packetIdentifier = packetIdentifier;\r
+    packetMatchParams.order = order;\r
+\r
+    IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+    IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
+                                    _packetMatch,\r
+                                    ( void * ) ( &packetMatchParams ),\r
+                                    IotMqtt_FreeSubscription,\r
+                                    offsetof( _mqttSubscription_t, link ) );\r
+    IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,\r
+                                               const IotMqttSubscription_t * pSubscriptionList,\r
+                                               size_t subscriptionCount )\r
+{\r
+    size_t i = 0;\r
+    _mqttSubscription_t * pSubscription = NULL;\r
+    IotLink_t * pSubscriptionLink = NULL;\r
+    _topicMatchParams_t topicMatchParams = { 0 };\r
+\r
+    /* Prevent any other thread from modifying the subscription list while this\r
+     * function is running. */\r
+    IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
+\r
+    /* Find and remove each topic filter from the list. */\r
+    for( i = 0; i < subscriptionCount; i++ )\r
+    {\r
+        topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
+        topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
+        topicMatchParams.exactMatchOnly = true;\r
+\r
+        pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
+                                                          NULL,\r
+                                                          _topicMatch,\r
+                                                          &topicMatchParams );\r
+\r
+        if( pSubscriptionLink != NULL )\r
+        {\r
+            pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
+\r
+            /* Reference count must not be negative. */\r
+            IotMqtt_Assert( pSubscription->references >= 0 );\r
+\r
+            /* Remove subscription from list. */\r
+            IotListDouble_Remove( pSubscriptionLink );\r
+\r
+            /* Check the reference count. This subscription cannot be removed if\r
+             * there are subscription callbacks using it. */\r
+            if( pSubscription->references > 0 )\r
+            {\r
+                /* Set the unsubscribed flag. The last active subscription callback\r
+                 * will remove and clean up this subscription. */\r
+                pSubscription->unsubscribed = true;\r
+            }\r
+            else\r
+            {\r
+                /* Free a subscription with no references. */\r
+                IotMqtt_FreeSubscription( pSubscription );\r
+            }\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+    }\r
+\r
+    IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
+                           const char * pTopicFilter,\r
+                           uint16_t topicFilterLength,\r
+                           IotMqttSubscription_t * const pCurrentSubscription )\r
+{\r
+    bool status = false;\r
+    _mqttSubscription_t * pSubscription = NULL;\r
+    IotLink_t * pSubscriptionLink = NULL;\r
+    _topicMatchParams_t topicMatchParams = { 0 };\r
+\r
+    /* Set the members of the search parameter. */\r
+    topicMatchParams.pTopicName = pTopicFilter;\r
+    topicMatchParams.topicNameLength = topicFilterLength;\r
+    topicMatchParams.exactMatchOnly = true;\r
+\r
+    /* Prevent any other thread from modifying the subscription list while this\r
+     * function is running. */\r
+    IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );\r
+\r
+    /* Search for a matching subscription. */\r
+    pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),\r
+                                                      NULL,\r
+                                                      _topicMatch,\r
+                                                      &topicMatchParams );\r
+\r
+    /* Check if a matching subscription was found. */\r
+    if( pSubscriptionLink != NULL )\r
+    {\r
+        pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
+\r
+        /* Copy the matching subscription to the output parameter. */\r
+        if( pCurrentSubscription != NULL )\r
+        {\r
+            pCurrentSubscription->pTopicFilter = pTopicFilter;\r
+            pCurrentSubscription->topicFilterLength = topicFilterLength;\r
+            pCurrentSubscription->qos = IOT_MQTT_QOS_0;\r
+            pCurrentSubscription->callback = pSubscription->callback;\r
+        }\r
+        else\r
+        {\r
+            EMPTY_ELSE_MARKER;\r
+        }\r
+\r
+        status = true;\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );\r
+\r
+    return status;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+/* Provide access to internal functions and variables if testing. */\r
+#if IOT_BUILD_TESTS == 1\r
+    #include "iot_test_access_mqtt_subscription.c"\r
+#endif\r