]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
Correct an err in queue.c introduced when previously updating behaviour when queue...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_subscription.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_subscription.c\r
28  * @brief Implements functions that manage subscriptions for an MQTT connection.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <stdbool.h>\r
36 #include <string.h>\r
37 \r
38 /* Error handling include. */\r
39 #include "private/iot_error.h"\r
40 \r
41 /* MQTT internal include. */\r
42 #include "private/iot_mqtt_internal.h"\r
43 \r
44 /* Platform layer includes. */\r
45 #include "platform/iot_threads.h"\r
46 \r
47 /*-----------------------------------------------------------*/\r
48 \r
49 /**\r
50  * @brief First parameter to #_topicMatch.\r
51  */\r
52 typedef struct _topicMatchParams\r
53 {\r
54     const char * pTopicName;  /**< @brief The topic name to parse. */\r
55     uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */\r
56     bool exactMatchOnly;      /**< @brief Whether to allow wildcards or require exact matches. */\r
57 } _topicMatchParams_t;\r
58 \r
59 /**\r
60  * @brief First parameter to #_packetMatch.\r
61  */\r
62 typedef struct _packetMatchParams\r
63 {\r
64     uint16_t packetIdentifier; /**< Packet identifier to match. */\r
65     int32_t order;             /**< Order to match. Set to `-1` to ignore. */\r
66 } _packetMatchParams_t;\r
67 \r
68 /*-----------------------------------------------------------*/\r
69 \r
70 /**\r
71  * @brief Matches a topic name (from a publish) with a topic filter (from a\r
72  * subscription).\r
73  *\r
74  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
75  * @param[in] pMatch Pointer to a #_topicMatchParams_t.\r
76  *\r
77  * @return `true` if the arguments match the subscription topic filter; `false`\r
78  * otherwise.\r
79  */\r
80 static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
81                          void * pMatch );\r
82 \r
83 /**\r
84  * @brief Matches a packet identifier and order.\r
85  *\r
86  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
87  * @param[in] pMatch Pointer to a #_packetMatchParams_t.\r
88  *\r
89  * @return `true` if the arguments match the subscription's packet info; `false`\r
90  * otherwise.\r
91  */\r
92 static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
93                           void * pMatch );\r
94 \r
95 /*-----------------------------------------------------------*/\r
96 \r
97 static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
98                          void * pMatch )\r
99 {\r
100     IOT_FUNCTION_ENTRY( bool, false );\r
101     uint16_t nameIndex = 0, filterIndex = 0;\r
102 \r
103     /* Because this function is called from a container function, the given link\r
104      * must never be NULL. */\r
105     IotMqtt_Assert( pSubscriptionLink != NULL );\r
106 \r
107     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
108                                                              pSubscriptionLink,\r
109                                                              link );\r
110     _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;\r
111 \r
112     /* Extract the relevant strings and lengths from parameters. */\r
113     const char * pTopicName = pParam->pTopicName;\r
114     const char * pTopicFilter = pSubscription->pTopicFilter;\r
115     const uint16_t topicNameLength = pParam->topicNameLength;\r
116     const uint16_t topicFilterLength = pSubscription->topicFilterLength;\r
117 \r
118     /* Check for an exact match. */\r
119     if( topicNameLength == topicFilterLength )\r
120     {\r
121         status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );\r
122 \r
123         IOT_GOTO_CLEANUP();\r
124     }\r
125     else\r
126     {\r
127         EMPTY_ELSE_MARKER;\r
128     }\r
129 \r
130     /* If the topic lengths are different but an exact match is required, return\r
131      * false. */\r
132     if( pParam->exactMatchOnly == true )\r
133     {\r
134         IOT_SET_AND_GOTO_CLEANUP( false );\r
135     }\r
136     else\r
137     {\r
138         EMPTY_ELSE_MARKER;\r
139     }\r
140 \r
141     while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )\r
142     {\r
143         /* Check if the character in the topic name matches the corresponding\r
144          * character in the topic filter string. */\r
145         if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )\r
146         {\r
147             /* Handle special corner cases as documented by the MQTT protocol spec. */\r
148 \r
149             /* Filter "sport/#" also matches "sport" since # includes the parent level. */\r
150             if( nameIndex == topicNameLength - 1 )\r
151             {\r
152                 if( filterIndex == topicFilterLength - 3 )\r
153                 {\r
154                     if( pTopicFilter[ filterIndex + 1 ] == '/' )\r
155                     {\r
156                         if( pTopicFilter[ filterIndex + 2 ] == '#' )\r
157                         {\r
158                             IOT_SET_AND_GOTO_CLEANUP( true );\r
159                         }\r
160                         else\r
161                         {\r
162                             EMPTY_ELSE_MARKER;\r
163                         }\r
164                     }\r
165                     else\r
166                     {\r
167                         EMPTY_ELSE_MARKER;\r
168                     }\r
169                 }\r
170                 else\r
171                 {\r
172                     EMPTY_ELSE_MARKER;\r
173                 }\r
174             }\r
175             else\r
176             {\r
177                 EMPTY_ELSE_MARKER;\r
178             }\r
179 \r
180             /* Filter "sport/+" also matches the "sport/" but not "sport". */\r
181             if( nameIndex == topicNameLength - 1 )\r
182             {\r
183                 if( filterIndex == topicFilterLength - 2 )\r
184                 {\r
185                     if( pTopicFilter[ filterIndex + 1 ] == '+' )\r
186                     {\r
187                         IOT_SET_AND_GOTO_CLEANUP( true );\r
188                     }\r
189                     else\r
190                     {\r
191                         EMPTY_ELSE_MARKER;\r
192                     }\r
193                 }\r
194                 else\r
195                 {\r
196                     EMPTY_ELSE_MARKER;\r
197                 }\r
198             }\r
199             else\r
200             {\r
201                 EMPTY_ELSE_MARKER;\r
202             }\r
203         }\r
204         else\r
205         {\r
206             /* Check for wildcards. */\r
207             if( pTopicFilter[ filterIndex ] == '+' )\r
208             {\r
209                 /* Move topic name index to the end of the current level.\r
210                  * This is identified by '/'. */\r
211                 while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )\r
212                 {\r
213                     nameIndex++;\r
214                 }\r
215 \r
216                 /* Increment filter index to skip '/'. */\r
217                 filterIndex++;\r
218                 continue;\r
219             }\r
220             else if( pTopicFilter[ filterIndex ] == '#' )\r
221             {\r
222                 /* Subsequent characters don't need to be checked if the for the\r
223                  * multi-level wildcard. */\r
224                 IOT_SET_AND_GOTO_CLEANUP( true );\r
225             }\r
226             else\r
227             {\r
228                 /* Any character mismatch other than '+' or '#' means the topic\r
229                  * name does not match the topic filter. */\r
230                 IOT_SET_AND_GOTO_CLEANUP( false );\r
231             }\r
232         }\r
233 \r
234         /* Increment indexes. */\r
235         nameIndex++;\r
236         filterIndex++;\r
237     }\r
238 \r
239     /* If the end of both strings has been reached, they match. */\r
240     if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )\r
241     {\r
242         IOT_SET_AND_GOTO_CLEANUP( true );\r
243     }\r
244     else\r
245     {\r
246         EMPTY_ELSE_MARKER;\r
247     }\r
248 \r
249     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
250 }\r
251 \r
252 /*-----------------------------------------------------------*/\r
253 \r
254 static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
255                           void * pMatch )\r
256 {\r
257     bool match = false;\r
258 \r
259     /* Because this function is called from a container function, the given link\r
260      * must never be NULL. */\r
261     IotMqtt_Assert( pSubscriptionLink != NULL );\r
262 \r
263     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
264                                                              pSubscriptionLink,\r
265                                                              link );\r
266     _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;\r
267 \r
268     /* Compare packet identifiers. */\r
269     if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )\r
270     {\r
271         /* Compare orders if order is not -1. */\r
272         if( pParam->order == -1 )\r
273         {\r
274             match = true;\r
275         }\r
276         else\r
277         {\r
278             match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;\r
279         }\r
280     }\r
281 \r
282     /* If this subscription should be removed, check the reference count. */\r
283     if( match == true )\r
284     {\r
285         /* Reference count must not be negative. */\r
286         IotMqtt_Assert( pSubscription->references >= 0 );\r
287 \r
288         /* If the reference count is positive, this subscription cannot be\r
289          * removed yet because there are subscription callbacks using it. */\r
290         if( pSubscription->references > 0 )\r
291         {\r
292             match = false;\r
293 \r
294             /* Set the unsubscribed flag. The last active subscription callback\r
295              * will remove and clean up this subscription. */\r
296             pSubscription->unsubscribed = true;\r
297         }\r
298         else\r
299         {\r
300             EMPTY_ELSE_MARKER;\r
301         }\r
302     }\r
303     else\r
304     {\r
305         EMPTY_ELSE_MARKER;\r
306     }\r
307 \r
308     return match;\r
309 }\r
310 \r
311 /*-----------------------------------------------------------*/\r
312 \r
313 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,\r
314                                           uint16_t subscribePacketIdentifier,\r
315                                           const IotMqttSubscription_t * pSubscriptionList,\r
316                                           size_t subscriptionCount )\r
317 {\r
318     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
319     size_t i = 0;\r
320     _mqttSubscription_t * pNewSubscription = NULL;\r
321     IotLink_t * pSubscriptionLink = NULL;\r
322     _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };\r
323 \r
324     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
325 \r
326     for( i = 0; i < subscriptionCount; i++ )\r
327     {\r
328         /* Check if this topic filter is already registered. */\r
329         topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
330         topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
331         pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
332                                                           NULL,\r
333                                                           _topicMatch,\r
334                                                           &topicMatchParams );\r
335 \r
336         if( pSubscriptionLink != NULL )\r
337         {\r
338             pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
339 \r
340             /* The lengths of exactly matching topic filters must match. */\r
341             IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );\r
342 \r
343             /* Replace the callback and packet info with the new parameters. */\r
344             pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
345             pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
346             pNewSubscription->packetInfo.order = i;\r
347         }\r
348         else\r
349         {\r
350             /* Allocate memory for a new subscription. */\r
351             pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +\r
352                                                            pSubscriptionList[ i ].topicFilterLength );\r
353 \r
354             if( pNewSubscription == NULL )\r
355             {\r
356                 status = IOT_MQTT_NO_MEMORY;\r
357                 break;\r
358             }\r
359             else\r
360             {\r
361                 /* Clear the new subscription. */\r
362                 ( void ) memset( pNewSubscription,\r
363                                  0x00,\r
364                                  sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );\r
365 \r
366                 /* Set the members of the new subscription and add it to the list. */\r
367                 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
368                 pNewSubscription->packetInfo.order = i;\r
369                 pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
370                 pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;\r
371                 ( void ) memcpy( pNewSubscription->pTopicFilter,\r
372                                  pSubscriptionList[ i ].pTopicFilter,\r
373                                  ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );\r
374 \r
375                 IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),\r
376                                           &( pNewSubscription->link ) );\r
377             }\r
378         }\r
379     }\r
380 \r
381     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
382 \r
383     /* If memory allocation failed, remove all previously added subscriptions. */\r
384     if( status != IOT_MQTT_SUCCESS )\r
385     {\r
386         _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,\r
387                                                   pSubscriptionList,\r
388                                                   i );\r
389     }\r
390     else\r
391     {\r
392         EMPTY_ELSE_MARKER;\r
393     }\r
394 \r
395     return status;\r
396 }\r
397 \r
398 /*-----------------------------------------------------------*/\r
399 \r
400 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,\r
401                                           IotMqttCallbackParam_t * pCallbackParam )\r
402 {\r
403     _mqttSubscription_t * pSubscription = NULL;\r
404     IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;\r
405     void * pCallbackContext = NULL;\r
406 \r
407     void ( * callbackFunction )( void *,\r
408                                  IotMqttCallbackParam_t * ) = NULL;\r
409     _topicMatchParams_t topicMatchParams = { 0 };\r
410 \r
411     topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;\r
412     topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;\r
413     topicMatchParams.exactMatchOnly = false;\r
414 \r
415     /* Prevent any other thread from modifying the subscription list while this\r
416      * function is searching. */\r
417     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
418 \r
419     /* Search the subscription list for all matching subscriptions starting at\r
420      * the list head. */\r
421     while( true )\r
422     {\r
423         pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
424                                                      pCurrentLink,\r
425                                                      _topicMatch,\r
426                                                      &topicMatchParams );\r
427 \r
428         /* No subscription found. Exit loop. */\r
429         if( pCurrentLink == NULL )\r
430         {\r
431             break;\r
432         }\r
433         else\r
434         {\r
435             EMPTY_ELSE_MARKER;\r
436         }\r
437 \r
438         /* Subscription found. Calculate pointer to subscription object. */\r
439         pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );\r
440 \r
441         /* Subscription validation should not have allowed a NULL callback function. */\r
442         IotMqtt_Assert( pSubscription->callback.function != NULL );\r
443 \r
444         /* Increment the subscription's reference count. */\r
445         ( pSubscription->references )++;\r
446 \r
447         /* Copy the necessary members of the subscription before releasing the\r
448          * subscription list mutex. */\r
449         pCallbackContext = pSubscription->callback.pCallbackContext;\r
450         callbackFunction = pSubscription->callback.function;\r
451 \r
452         /* Unlock the subscription list mutex. */\r
453         IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
454 \r
455         /* Set the members of the callback parameter. */\r
456         pCallbackParam->mqttConnection = pMqttConnection;\r
457         pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;\r
458         pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;\r
459 \r
460         /* Invoke the subscription callback. */\r
461         callbackFunction( pCallbackContext, pCallbackParam );\r
462 \r
463         /* Lock the subscription list mutex to decrement the reference count. */\r
464         IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
465 \r
466         /* Decrement the reference count. It must still be positive. */\r
467         ( pSubscription->references )--;\r
468         IotMqtt_Assert( pSubscription->references >= 0 );\r
469 \r
470         /* Save the pointer to the next link in case this subscription is freed. */\r
471         pNextLink = pCurrentLink->pNext;\r
472 \r
473         /* Remove this subscription if it has no references and the unsubscribed\r
474          * flag is set. */\r
475         if( pSubscription->unsubscribed == true )\r
476         {\r
477             /* An unsubscribed subscription should have been removed from the list. */\r
478             IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );\r
479 \r
480             /* Free subscriptions with no references. */\r
481             if( pSubscription->references == 0 )\r
482             {\r
483                 IotMqtt_FreeSubscription( pSubscription );\r
484             }\r
485             else\r
486             {\r
487                 EMPTY_ELSE_MARKER;\r
488             }\r
489         }\r
490         else\r
491         {\r
492             EMPTY_ELSE_MARKER;\r
493         }\r
494 \r
495         /* Move current link pointer. */\r
496         pCurrentLink = pNextLink;\r
497     }\r
498 \r
499     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
500 \r
501     _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
502 }\r
503 \r
504 /*-----------------------------------------------------------*/\r
505 \r
506 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,\r
507                                           uint16_t packetIdentifier,\r
508                                           int32_t order )\r
509 {\r
510     _packetMatchParams_t packetMatchParams = { 0 };\r
511 \r
512     packetMatchParams.packetIdentifier = packetIdentifier;\r
513     packetMatchParams.order = order;\r
514 \r
515     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
516     IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
517                                     _packetMatch,\r
518                                     ( void * ) ( &packetMatchParams ),\r
519                                     IotMqtt_FreeSubscription,\r
520                                     offsetof( _mqttSubscription_t, link ) );\r
521     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
522 }\r
523 \r
524 /*-----------------------------------------------------------*/\r
525 \r
526 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,\r
527                                                const IotMqttSubscription_t * pSubscriptionList,\r
528                                                size_t subscriptionCount )\r
529 {\r
530     size_t i = 0;\r
531     _mqttSubscription_t * pSubscription = NULL;\r
532     IotLink_t * pSubscriptionLink = NULL;\r
533     _topicMatchParams_t topicMatchParams = { 0 };\r
534 \r
535     /* Prevent any other thread from modifying the subscription list while this\r
536      * function is running. */\r
537     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
538 \r
539     /* Find and remove each topic filter from the list. */\r
540     for( i = 0; i < subscriptionCount; i++ )\r
541     {\r
542         topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
543         topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
544         topicMatchParams.exactMatchOnly = true;\r
545 \r
546         pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
547                                                           NULL,\r
548                                                           _topicMatch,\r
549                                                           &topicMatchParams );\r
550 \r
551         if( pSubscriptionLink != NULL )\r
552         {\r
553             pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
554 \r
555             /* Reference count must not be negative. */\r
556             IotMqtt_Assert( pSubscription->references >= 0 );\r
557 \r
558             /* Remove subscription from list. */\r
559             IotListDouble_Remove( pSubscriptionLink );\r
560 \r
561             /* Check the reference count. This subscription cannot be removed if\r
562              * there are subscription callbacks using it. */\r
563             if( pSubscription->references > 0 )\r
564             {\r
565                 /* Set the unsubscribed flag. The last active subscription callback\r
566                  * will remove and clean up this subscription. */\r
567                 pSubscription->unsubscribed = true;\r
568             }\r
569             else\r
570             {\r
571                 /* Free a subscription with no references. */\r
572                 IotMqtt_FreeSubscription( pSubscription );\r
573             }\r
574         }\r
575         else\r
576         {\r
577             EMPTY_ELSE_MARKER;\r
578         }\r
579     }\r
580 \r
581     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
582 }\r
583 \r
584 /*-----------------------------------------------------------*/\r
585 \r
586 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
587                            const char * pTopicFilter,\r
588                            uint16_t topicFilterLength,\r
589                            IotMqttSubscription_t * const pCurrentSubscription )\r
590 {\r
591     bool status = false;\r
592     _mqttSubscription_t * pSubscription = NULL;\r
593     IotLink_t * pSubscriptionLink = NULL;\r
594     _topicMatchParams_t topicMatchParams = { 0 };\r
595 \r
596     topicMatchParams.pTopicName = pTopicFilter;\r
597     topicMatchParams.topicNameLength = topicFilterLength;\r
598     topicMatchParams.exactMatchOnly = true;\r
599 \r
600     /* Prevent any other thread from modifying the subscription list while this\r
601      * function is running. */\r
602     IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );\r
603 \r
604     /* Search for a matching subscription. */\r
605     pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),\r
606                                                       NULL,\r
607                                                       _topicMatch,\r
608                                                       &topicMatchParams );\r
609 \r
610     /* Check if a matching subscription was found. */\r
611     if( pSubscriptionLink != NULL )\r
612     {\r
613         pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
614 \r
615         /* Copy the matching subscription to the output parameter. */\r
616         if( pCurrentSubscription != NULL )\r
617         {\r
618             pCurrentSubscription->pTopicFilter = pTopicFilter;\r
619             pCurrentSubscription->topicFilterLength = topicFilterLength;\r
620             pCurrentSubscription->qos = IOT_MQTT_QOS_0;\r
621             pCurrentSubscription->callback = pSubscription->callback;\r
622         }\r
623         else\r
624         {\r
625             EMPTY_ELSE_MARKER;\r
626         }\r
627 \r
628         status = true;\r
629     }\r
630     else\r
631     {\r
632         EMPTY_ELSE_MARKER;\r
633     }\r
634 \r
635     IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );\r
636 \r
637     return status;\r
638 }\r
639 \r
640 /*-----------------------------------------------------------*/\r
641 \r
642 /* Provide access to internal functions and variables if testing. */\r
643 #if IOT_BUILD_TESTS == 1\r
644     #include "iot_test_access_mqtt_subscription.c"\r
645 #endif\r