]> git.sur5r.net Git - freertos/blob - FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
Add the Labs projects provided in the V10.2.1_191129 zip file.
[freertos] / FreeRTOS-Labs / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_subscription.c
1 /*\r
2  * IoT MQTT V2.1.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  */\r
22 \r
23 /**\r
24  * @file iot_mqtt_subscription.c\r
25  * @brief Implements functions that manage subscriptions for an MQTT connection.\r
26  */\r
27 \r
28 /* The config header is always included first. */\r
29 #include "iot_config.h"\r
30 \r
31 /* Standard includes. */\r
32 #include <stdbool.h>\r
33 #include <string.h>\r
34 \r
35 /* Error handling include. */\r
36 #include "iot_error.h"\r
37 \r
38 /* MQTT internal include. */\r
39 #include "private/iot_mqtt_internal.h"\r
40 \r
41 /* Platform layer includes. */\r
42 #include "platform/iot_threads.h"\r
43 \r
44 /*-----------------------------------------------------------*/\r
45 \r
46 /**\r
47  * @brief First parameter to #_topicMatch.\r
48  */\r
49 typedef struct _topicMatchParams\r
50 {\r
51     const char * pTopicName;  /**< @brief The topic name to parse. */\r
52     uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */\r
53     bool exactMatchOnly;      /**< @brief Whether to allow wildcards or require exact matches. */\r
54 } _topicMatchParams_t;\r
55 \r
56 /**\r
57  * @brief First parameter to #_packetMatch.\r
58  */\r
59 typedef struct _packetMatchParams\r
60 {\r
61     uint16_t packetIdentifier; /**< Packet identifier to match. */\r
62     int32_t order;             /**< Order to match. Set to #MQTT_REMOVE_ALL_SUBSCRIPTIONS to ignore. */\r
63 } _packetMatchParams_t;\r
64 \r
65 /*-----------------------------------------------------------*/\r
66 \r
67 /**\r
68  * @brief Matches a topic name (from a publish) with a topic filter (from a\r
69  * subscription).\r
70  *\r
71  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
72  * @param[in] pMatch Pointer to a #_topicMatchParams_t.\r
73  *\r
74  * @return `true` if the arguments match the subscription topic filter; `false`\r
75  * otherwise.\r
76  */\r
77 static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
78                          void * pMatch );\r
79 \r
80 /**\r
81  * @brief Matches a packet identifier and order.\r
82  *\r
83  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
84  * @param[in] pMatch Pointer to a #_packetMatchParams_t.\r
85  *\r
86  * @return `true` if the arguments match the subscription's packet info; `false`\r
87  * otherwise.\r
88  */\r
89 static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
90                           void * pMatch );\r
91 \r
92 /*-----------------------------------------------------------*/\r
93 \r
94 static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
95                          void * pMatch )\r
96 {\r
97     IOT_FUNCTION_ENTRY( bool, false );\r
98     uint16_t nameIndex = 0, filterIndex = 0;\r
99 \r
100     /* Because this function is called from a container function, the given link\r
101      * must never be NULL. */\r
102     IotMqtt_Assert( pSubscriptionLink != NULL );\r
103 \r
104     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
105                                                              pSubscriptionLink,\r
106                                                              link );\r
107     _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;\r
108 \r
109     /* Extract the relevant strings and lengths from parameters. */\r
110     const char * pTopicName = pParam->pTopicName;\r
111     const char * pTopicFilter = pSubscription->pTopicFilter;\r
112     const uint16_t topicNameLength = pParam->topicNameLength;\r
113     const uint16_t topicFilterLength = pSubscription->topicFilterLength;\r
114 \r
115     /* Check for an exact match. */\r
116     if( topicNameLength == topicFilterLength )\r
117     {\r
118         status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );\r
119 \r
120         IOT_GOTO_CLEANUP();\r
121     }\r
122     else\r
123     {\r
124         EMPTY_ELSE_MARKER;\r
125     }\r
126 \r
127     /* If the topic lengths are different but an exact match is required, return\r
128      * false. */\r
129     if( pParam->exactMatchOnly == true )\r
130     {\r
131         IOT_SET_AND_GOTO_CLEANUP( false );\r
132     }\r
133     else\r
134     {\r
135         EMPTY_ELSE_MARKER;\r
136     }\r
137 \r
138     while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )\r
139     {\r
140         /* Check if the character in the topic name matches the corresponding\r
141          * character in the topic filter string. */\r
142         if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )\r
143         {\r
144             /* Handle special corner cases as documented by the MQTT protocol spec. */\r
145 \r
146             /* Filter "sport/#" also matches "sport" since # includes the parent level. */\r
147             if( nameIndex == topicNameLength - 1 )\r
148             {\r
149                 if( filterIndex == topicFilterLength - 3 )\r
150                 {\r
151                     if( pTopicFilter[ filterIndex + 1 ] == '/' )\r
152                     {\r
153                         if( pTopicFilter[ filterIndex + 2 ] == '#' )\r
154                         {\r
155                             IOT_SET_AND_GOTO_CLEANUP( true );\r
156                         }\r
157                         else\r
158                         {\r
159                             EMPTY_ELSE_MARKER;\r
160                         }\r
161                     }\r
162                     else\r
163                     {\r
164                         EMPTY_ELSE_MARKER;\r
165                     }\r
166                 }\r
167                 else\r
168                 {\r
169                     EMPTY_ELSE_MARKER;\r
170                 }\r
171             }\r
172             else\r
173             {\r
174                 EMPTY_ELSE_MARKER;\r
175             }\r
176 \r
177             /* Filter "sport/+" also matches the "sport/" but not "sport". */\r
178             if( nameIndex == topicNameLength - 1 )\r
179             {\r
180                 if( filterIndex == topicFilterLength - 2 )\r
181                 {\r
182                     if( pTopicFilter[ filterIndex + 1 ] == '+' )\r
183                     {\r
184                         IOT_SET_AND_GOTO_CLEANUP( true );\r
185                     }\r
186                     else\r
187                     {\r
188                         EMPTY_ELSE_MARKER;\r
189                     }\r
190                 }\r
191                 else\r
192                 {\r
193                     EMPTY_ELSE_MARKER;\r
194                 }\r
195             }\r
196             else\r
197             {\r
198                 EMPTY_ELSE_MARKER;\r
199             }\r
200         }\r
201         else\r
202         {\r
203             /* Check for wildcards. */\r
204             if( pTopicFilter[ filterIndex ] == '+' )\r
205             {\r
206                 /* Move topic name index to the end of the current level.\r
207                  * This is identified by '/'. */\r
208                 while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )\r
209                 {\r
210                     nameIndex++;\r
211                 }\r
212 \r
213                 /* Increment filter index to skip '/'. */\r
214                 filterIndex++;\r
215                 continue;\r
216             }\r
217             else if( pTopicFilter[ filterIndex ] == '#' )\r
218             {\r
219                 /* Subsequent characters don't need to be checked if the for the\r
220                  * multi-level wildcard. */\r
221                 IOT_SET_AND_GOTO_CLEANUP( true );\r
222             }\r
223             else\r
224             {\r
225                 /* Any character mismatch other than '+' or '#' means the topic\r
226                  * name does not match the topic filter. */\r
227                 IOT_SET_AND_GOTO_CLEANUP( false );\r
228             }\r
229         }\r
230 \r
231         /* Increment indexes. */\r
232         nameIndex++;\r
233         filterIndex++;\r
234     }\r
235 \r
236     /* If the end of both strings has been reached, they match. */\r
237     if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )\r
238     {\r
239         IOT_SET_AND_GOTO_CLEANUP( true );\r
240     }\r
241     else\r
242     {\r
243         EMPTY_ELSE_MARKER;\r
244     }\r
245 \r
246     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
247 }\r
248 \r
249 /*-----------------------------------------------------------*/\r
250 \r
251 static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
252                           void * pMatch )\r
253 {\r
254     bool match = false;\r
255 \r
256     /* Because this function is called from a container function, the given link\r
257      * must never be NULL. */\r
258     IotMqtt_Assert( pSubscriptionLink != NULL );\r
259 \r
260     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
261                                                              pSubscriptionLink,\r
262                                                              link );\r
263     _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;\r
264 \r
265     /* Compare packet identifiers. */\r
266     if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )\r
267     {\r
268         /* Compare orders if order is not MQTT_REMOVE_ALL_SUBSCRIPTIONS. */\r
269         if( pParam->order == MQTT_REMOVE_ALL_SUBSCRIPTIONS )\r
270         {\r
271             match = true;\r
272         }\r
273         else\r
274         {\r
275             match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;\r
276         }\r
277     }\r
278 \r
279     /* If this subscription should be removed, check the reference count. */\r
280     if( match == true )\r
281     {\r
282         /* Reference count must not be negative. */\r
283         IotMqtt_Assert( pSubscription->references >= 0 );\r
284 \r
285         /* If the reference count is positive, this subscription cannot be\r
286          * removed yet because there are subscription callbacks using it. */\r
287         if( pSubscription->references > 0 )\r
288         {\r
289             match = false;\r
290 \r
291             /* Set the unsubscribed flag. The last active subscription callback\r
292              * will remove and clean up this subscription. */\r
293             pSubscription->unsubscribed = true;\r
294         }\r
295         else\r
296         {\r
297             EMPTY_ELSE_MARKER;\r
298         }\r
299     }\r
300     else\r
301     {\r
302         EMPTY_ELSE_MARKER;\r
303     }\r
304 \r
305     return match;\r
306 }\r
307 \r
308 /*-----------------------------------------------------------*/\r
309 \r
310 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,\r
311                                           uint16_t subscribePacketIdentifier,\r
312                                           const IotMqttSubscription_t * pSubscriptionList,\r
313                                           size_t subscriptionCount )\r
314 {\r
315     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
316     size_t i = 0;\r
317     _mqttSubscription_t * pNewSubscription = NULL;\r
318     IotLink_t * pSubscriptionLink = NULL;\r
319     _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };\r
320 \r
321     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
322 \r
323     for( i = 0; i < subscriptionCount; i++ )\r
324     {\r
325         /* Check if this topic filter is already registered. */\r
326         topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
327         topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
328         pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
329                                                           NULL,\r
330                                                           _topicMatch,\r
331                                                           &topicMatchParams );\r
332 \r
333         if( pSubscriptionLink != NULL )\r
334         {\r
335             pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
336 \r
337             /* The lengths of exactly matching topic filters must match. */\r
338             IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );\r
339 \r
340             /* Replace the callback and packet info with the new parameters. */\r
341             pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
342             pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
343             pNewSubscription->packetInfo.order = i;\r
344         }\r
345         else\r
346         {\r
347             /* Allocate memory for a new subscription. */\r
348             pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +\r
349                                                            pSubscriptionList[ i ].topicFilterLength );\r
350 \r
351             if( pNewSubscription == NULL )\r
352             {\r
353                 status = IOT_MQTT_NO_MEMORY;\r
354                 break;\r
355             }\r
356             else\r
357             {\r
358                 /* Clear the new subscription. */\r
359                 ( void ) memset( pNewSubscription,\r
360                                  0x00,\r
361                                  sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );\r
362 \r
363                 /* Set the members of the new subscription and add it to the list. */\r
364                 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
365                 pNewSubscription->packetInfo.order = i;\r
366                 pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
367                 pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;\r
368                 ( void ) memcpy( pNewSubscription->pTopicFilter,\r
369                                  pSubscriptionList[ i ].pTopicFilter,\r
370                                  ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );\r
371 \r
372                 IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),\r
373                                           &( pNewSubscription->link ) );\r
374             }\r
375         }\r
376     }\r
377 \r
378     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
379 \r
380     /* If memory allocation failed, remove all previously added subscriptions. */\r
381     if( status != IOT_MQTT_SUCCESS )\r
382     {\r
383         _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,\r
384                                                   pSubscriptionList,\r
385                                                   i );\r
386     }\r
387     else\r
388     {\r
389         EMPTY_ELSE_MARKER;\r
390     }\r
391 \r
392     return status;\r
393 }\r
394 \r
395 /*-----------------------------------------------------------*/\r
396 \r
397 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,\r
398                                           IotMqttCallbackParam_t * pCallbackParam )\r
399 {\r
400     _mqttSubscription_t * pSubscription = NULL;\r
401     IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;\r
402     void * pCallbackContext = NULL;\r
403 \r
404     void ( * callbackFunction )( void *,\r
405                                  IotMqttCallbackParam_t * ) = NULL;\r
406     _topicMatchParams_t topicMatchParams = { 0 };\r
407 \r
408     /* Set the members of the search parameter. */\r
409     topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;\r
410     topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;\r
411     topicMatchParams.exactMatchOnly = false;\r
412 \r
413     /* Prevent any other thread from modifying the subscription list while this\r
414      * function is searching. */\r
415     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
416 \r
417     /* Search the subscription list for all matching subscriptions starting at\r
418      * the list head. */\r
419     while( true )\r
420     {\r
421         pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
422                                                      pCurrentLink,\r
423                                                      _topicMatch,\r
424                                                      &topicMatchParams );\r
425 \r
426         /* No subscription found. Exit loop. */\r
427         if( pCurrentLink == NULL )\r
428         {\r
429             break;\r
430         }\r
431         else\r
432         {\r
433             EMPTY_ELSE_MARKER;\r
434         }\r
435 \r
436         /* Subscription found. Calculate pointer to subscription object. */\r
437         pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );\r
438 \r
439         /* Subscription validation should not have allowed a NULL callback function. */\r
440         IotMqtt_Assert( pSubscription->callback.function != NULL );\r
441 \r
442         /* Increment the subscription's reference count. */\r
443         ( pSubscription->references )++;\r
444 \r
445         /* Copy the necessary members of the subscription before releasing the\r
446          * subscription list mutex. */\r
447         pCallbackContext = pSubscription->callback.pCallbackContext;\r
448         callbackFunction = pSubscription->callback.function;\r
449 \r
450         /* Unlock the subscription list mutex. */\r
451         IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
452 \r
453         /* Set the members of the callback parameter. */\r
454         pCallbackParam->mqttConnection = pMqttConnection;\r
455         pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;\r
456         pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;\r
457 \r
458         /* Invoke the subscription callback. */\r
459         callbackFunction( pCallbackContext, pCallbackParam );\r
460 \r
461         /* Lock the subscription list mutex to decrement the reference count. */\r
462         IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
463 \r
464         /* Decrement the reference count. It must still be positive. */\r
465         ( pSubscription->references )--;\r
466         IotMqtt_Assert( pSubscription->references >= 0 );\r
467 \r
468         /* Save the pointer to the next link in case this subscription is freed. */\r
469         pNextLink = pCurrentLink->pNext;\r
470 \r
471         /* Remove this subscription if it has no references and the unsubscribed\r
472          * flag is set. */\r
473         if( pSubscription->unsubscribed == true )\r
474         {\r
475             /* An unsubscribed subscription should have been removed from the list. */\r
476             IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );\r
477 \r
478             /* Free subscriptions with no references. */\r
479             if( pSubscription->references == 0 )\r
480             {\r
481                 IotMqtt_FreeSubscription( pSubscription );\r
482             }\r
483             else\r
484             {\r
485                 EMPTY_ELSE_MARKER;\r
486             }\r
487         }\r
488         else\r
489         {\r
490             EMPTY_ELSE_MARKER;\r
491         }\r
492 \r
493         /* Move current link pointer. */\r
494         pCurrentLink = pNextLink;\r
495     }\r
496 \r
497     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
498 \r
499     _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
500 }\r
501 \r
502 /*-----------------------------------------------------------*/\r
503 \r
504 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,\r
505                                           uint16_t packetIdentifier,\r
506                                           int32_t order )\r
507 {\r
508     _packetMatchParams_t packetMatchParams = { 0 };\r
509 \r
510     /* Set the members of the search parameter. */\r
511     packetMatchParams.packetIdentifier = packetIdentifier;\r
512     packetMatchParams.order = order;\r
513 \r
514     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
515     IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
516                                     _packetMatch,\r
517                                     ( void * ) ( &packetMatchParams ),\r
518                                     IotMqtt_FreeSubscription,\r
519                                     offsetof( _mqttSubscription_t, link ) );\r
520     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
521 }\r
522 \r
523 /*-----------------------------------------------------------*/\r
524 \r
525 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,\r
526                                                const IotMqttSubscription_t * pSubscriptionList,\r
527                                                size_t subscriptionCount )\r
528 {\r
529     size_t i = 0;\r
530     _mqttSubscription_t * pSubscription = NULL;\r
531     IotLink_t * pSubscriptionLink = NULL;\r
532     _topicMatchParams_t topicMatchParams = { 0 };\r
533 \r
534     /* Prevent any other thread from modifying the subscription list while this\r
535      * function is running. */\r
536     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
537 \r
538     /* Find and remove each topic filter from the list. */\r
539     for( i = 0; i < subscriptionCount; i++ )\r
540     {\r
541         topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
542         topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
543         topicMatchParams.exactMatchOnly = true;\r
544 \r
545         pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
546                                                           NULL,\r
547                                                           _topicMatch,\r
548                                                           &topicMatchParams );\r
549 \r
550         if( pSubscriptionLink != NULL )\r
551         {\r
552             pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
553 \r
554             /* Reference count must not be negative. */\r
555             IotMqtt_Assert( pSubscription->references >= 0 );\r
556 \r
557             /* Remove subscription from list. */\r
558             IotListDouble_Remove( pSubscriptionLink );\r
559 \r
560             /* Check the reference count. This subscription cannot be removed if\r
561              * there are subscription callbacks using it. */\r
562             if( pSubscription->references > 0 )\r
563             {\r
564                 /* Set the unsubscribed flag. The last active subscription callback\r
565                  * will remove and clean up this subscription. */\r
566                 pSubscription->unsubscribed = true;\r
567             }\r
568             else\r
569             {\r
570                 /* Free a subscription with no references. */\r
571                 IotMqtt_FreeSubscription( pSubscription );\r
572             }\r
573         }\r
574         else\r
575         {\r
576             EMPTY_ELSE_MARKER;\r
577         }\r
578     }\r
579 \r
580     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
581 }\r
582 \r
583 /*-----------------------------------------------------------*/\r
584 \r
585 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
586                            const char * pTopicFilter,\r
587                            uint16_t topicFilterLength,\r
588                            IotMqttSubscription_t * const pCurrentSubscription )\r
589 {\r
590     bool status = false;\r
591     _mqttSubscription_t * pSubscription = NULL;\r
592     IotLink_t * pSubscriptionLink = NULL;\r
593     _topicMatchParams_t topicMatchParams = { 0 };\r
594 \r
595     /* Set the members of the search parameter. */\r
596     topicMatchParams.pTopicName = pTopicFilter;\r
597     topicMatchParams.topicNameLength = topicFilterLength;\r
598     topicMatchParams.exactMatchOnly = true;\r
599 \r
600     /* Prevent any other thread from modifying the subscription list while this\r
601      * function is running. */\r
602     IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );\r
603 \r
604     /* Search for a matching subscription. */\r
605     pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),\r
606                                                       NULL,\r
607                                                       _topicMatch,\r
608                                                       &topicMatchParams );\r
609 \r
610     /* Check if a matching subscription was found. */\r
611     if( pSubscriptionLink != NULL )\r
612     {\r
613         pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
614 \r
615         /* Copy the matching subscription to the output parameter. */\r
616         if( pCurrentSubscription != NULL )\r
617         {\r
618             pCurrentSubscription->pTopicFilter = pTopicFilter;\r
619             pCurrentSubscription->topicFilterLength = topicFilterLength;\r
620             pCurrentSubscription->qos = IOT_MQTT_QOS_0;\r
621             pCurrentSubscription->callback = pSubscription->callback;\r
622         }\r
623         else\r
624         {\r
625             EMPTY_ELSE_MARKER;\r
626         }\r
627 \r
628         status = true;\r
629     }\r
630     else\r
631     {\r
632         EMPTY_ELSE_MARKER;\r
633     }\r
634 \r
635     IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );\r
636 \r
637     return status;\r
638 }\r
639 \r
640 /*-----------------------------------------------------------*/\r
641 \r
642 /* Provide access to internal functions and variables if testing. */\r
643 #if IOT_BUILD_TESTS == 1\r
644     #include "iot_test_access_mqtt_subscription.c"\r
645 #endif\r