]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
Rename \FreeRTOS-Plus\Source\FreeRTOS-Plus-IoT-SDK to \FreeRTOS-Plus\Source\FreeRTOS...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_subscription.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_subscription.c\r
28  * @brief Implements functions that manage subscriptions for an MQTT connection.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <stdbool.h>\r
36 #include <string.h>\r
37 \r
38 /* Error handling include. */\r
39 #include "private/iot_error.h"\r
40 \r
41 /* MQTT internal include. */\r
42 #include "private/iot_mqtt_internal.h"\r
43 \r
44 /* Platform layer includes. */\r
45 #include "platform/iot_threads.h"\r
46 \r
47 /*-----------------------------------------------------------*/\r
48 \r
49 /**\r
50  * @brief First parameter to #_topicMatch.\r
51  */\r
52 typedef struct _topicMatchParams\r
53 {\r
54     const char * pTopicName;  /**< @brief The topic name to parse. */\r
55     uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */\r
56     bool exactMatchOnly;      /**< @brief Whether to allow wildcards or require exact matches. */\r
57 } _topicMatchParams_t;\r
58 \r
59 /**\r
60  * @brief First parameter to #_packetMatch.\r
61  */\r
62 typedef struct _packetMatchParams\r
63 {\r
64     uint16_t packetIdentifier; /**< Packet identifier to match. */\r
65     int32_t order;             /**< Order to match. Set to `-1` to ignore. */\r
66 } _packetMatchParams_t;\r
67 \r
68 /*-----------------------------------------------------------*/\r
69 \r
70 /**\r
71  * @brief Matches a topic name (from a publish) with a topic filter (from a\r
72  * subscription).\r
73  *\r
74  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
75  * @param[in] pMatch Pointer to a #_topicMatchParams_t.\r
76  *\r
77  * @return `true` if the arguments match the subscription topic filter; `false`\r
78  * otherwise.\r
79  */\r
80 static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
81                          void * pMatch );\r
82 \r
83 /**\r
84  * @brief Matches a packet identifier and order.\r
85  *\r
86  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
87  * @param[in] pMatch Pointer to a #_packetMatchParams_t.\r
88  *\r
89  * @return `true` if the arguments match the subscription's packet info; `false`\r
90  * otherwise.\r
91  */\r
92 static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
93                           void * pMatch );\r
94 \r
95 /*-----------------------------------------------------------*/\r
96 \r
97 static bool _topicMatch( const IotLink_t * pSubscriptionLink,\r
98                          void * pMatch )\r
99 {\r
100     IOT_FUNCTION_ENTRY( bool, false );\r
101     uint16_t nameIndex = 0, filterIndex = 0;\r
102 \r
103     /* Because this function is called from a container function, the given link\r
104      * must never be NULL. */\r
105     IotMqtt_Assert( pSubscriptionLink != NULL );\r
106 \r
107     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
108                                                              pSubscriptionLink,\r
109                                                              link );\r
110     _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;\r
111 \r
112     /* Extract the relevant strings and lengths from parameters. */\r
113     const char * pTopicName = pParam->pTopicName;\r
114     const char * pTopicFilter = pSubscription->pTopicFilter;\r
115     const uint16_t topicNameLength = pParam->topicNameLength;\r
116     const uint16_t topicFilterLength = pSubscription->topicFilterLength;\r
117 \r
118     /* Check for an exact match. */\r
119     if( topicNameLength == topicFilterLength )\r
120     {\r
121         status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );\r
122 \r
123         IOT_GOTO_CLEANUP();\r
124     }\r
125     else\r
126     {\r
127         EMPTY_ELSE_MARKER;\r
128     }\r
129 \r
130     /* If the topic lengths are different but an exact match is required, return\r
131      * false. */\r
132     if( pParam->exactMatchOnly == true )\r
133     {\r
134         IOT_SET_AND_GOTO_CLEANUP( false );\r
135     }\r
136     else\r
137     {\r
138         EMPTY_ELSE_MARKER;\r
139     }\r
140 \r
141     while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )\r
142     {\r
143         /* Check if the character in the topic name matches the corresponding\r
144          * character in the topic filter string. */\r
145         if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )\r
146         {\r
147             /* Handle special corner cases as documented by the MQTT protocol spec. */\r
148 \r
149             /* Filter "sport/#" also matches "sport" since # includes the parent level. */\r
150             if( nameIndex == topicNameLength - 1 )\r
151             {\r
152                 if( filterIndex == topicFilterLength - 3 )\r
153                 {\r
154                     if( pTopicFilter[ filterIndex + 1 ] == '/' )\r
155                     {\r
156                         if( pTopicFilter[ filterIndex + 2 ] == '#' )\r
157                         {\r
158                             IOT_SET_AND_GOTO_CLEANUP( true );\r
159                         }\r
160                         else\r
161                         {\r
162                             EMPTY_ELSE_MARKER;\r
163                         }\r
164                     }\r
165                     else\r
166                     {\r
167                         EMPTY_ELSE_MARKER;\r
168                     }\r
169                 }\r
170                 else\r
171                 {\r
172                     EMPTY_ELSE_MARKER;\r
173                 }\r
174             }\r
175             else\r
176             {\r
177                 EMPTY_ELSE_MARKER;\r
178             }\r
179 \r
180             /* Filter "sport/+" also matches the "sport/" but not "sport". */\r
181             if( nameIndex == topicNameLength - 1 )\r
182             {\r
183                 if( filterIndex == topicFilterLength - 2 )\r
184                 {\r
185                     if( pTopicFilter[ filterIndex + 1 ] == '+' )\r
186                     {\r
187                         IOT_SET_AND_GOTO_CLEANUP( true );\r
188                     }\r
189                     else\r
190                     {\r
191                         EMPTY_ELSE_MARKER;\r
192                     }\r
193                 }\r
194                 else\r
195                 {\r
196                     EMPTY_ELSE_MARKER;\r
197                 }\r
198             }\r
199             else\r
200             {\r
201                 EMPTY_ELSE_MARKER;\r
202             }\r
203         }\r
204         else\r
205         {\r
206             /* Check for wildcards. */\r
207             if( pTopicFilter[ filterIndex ] == '+' )\r
208             {\r
209                 /* Move topic name index to the end of the current level.\r
210                  * This is identified by '/'. */\r
211                 while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )\r
212                 {\r
213                     nameIndex++;\r
214                 }\r
215 \r
216                 /* Increment filter index to skip '/'. */\r
217                 filterIndex++;\r
218                 continue;\r
219             }\r
220             else if( pTopicFilter[ filterIndex ] == '#' )\r
221             {\r
222                 /* Subsequent characters don't need to be checked if the for the\r
223                  * multi-level wildcard. */\r
224                 IOT_SET_AND_GOTO_CLEANUP( true );\r
225             }\r
226             else\r
227             {\r
228                 /* Any character mismatch other than '+' or '#' means the topic\r
229                  * name does not match the topic filter. */\r
230                 IOT_SET_AND_GOTO_CLEANUP( false );\r
231             }\r
232         }\r
233 \r
234         /* Increment indexes. */\r
235         nameIndex++;\r
236         filterIndex++;\r
237     }\r
238 \r
239     /* If the end of both strings has been reached, they match. */\r
240     if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )\r
241     {\r
242         IOT_SET_AND_GOTO_CLEANUP( true );\r
243     }\r
244     else\r
245     {\r
246         EMPTY_ELSE_MARKER;\r
247     }\r
248 \r
249     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
250 }\r
251 \r
252 /*-----------------------------------------------------------*/\r
253 \r
254 static bool _packetMatch( const IotLink_t * pSubscriptionLink,\r
255                           void * pMatch )\r
256 {\r
257     bool match = false;\r
258 \r
259     /* Because this function is called from a container function, the given link\r
260      * must never be NULL. */\r
261     IotMqtt_Assert( pSubscriptionLink != NULL );\r
262 \r
263     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
264                                                              pSubscriptionLink,\r
265                                                              link );\r
266     _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;\r
267 \r
268     /* Compare packet identifiers. */\r
269     if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )\r
270     {\r
271         /* Compare orders if order is not -1. */\r
272         if( pParam->order == -1 )\r
273         {\r
274             match = true;\r
275         }\r
276         else\r
277         {\r
278             match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;\r
279         }\r
280     }\r
281 \r
282     /* If this subscription should be removed, check the reference count. */\r
283     if( match == true )\r
284     {\r
285         /* Reference count must not be negative. */\r
286         IotMqtt_Assert( pSubscription->references >= 0 );\r
287 \r
288         /* If the reference count is positive, this subscription cannot be\r
289          * removed yet because there are subscription callbacks using it. */\r
290         if( pSubscription->references > 0 )\r
291         {\r
292             match = false;\r
293 \r
294             /* Set the unsubscribed flag. The last active subscription callback\r
295              * will remove and clean up this subscription. */\r
296             pSubscription->unsubscribed = true;\r
297         }\r
298         else\r
299         {\r
300             EMPTY_ELSE_MARKER;\r
301         }\r
302     }\r
303     else\r
304     {\r
305         EMPTY_ELSE_MARKER;\r
306     }\r
307 \r
308     return match;\r
309 }\r
310 \r
311 /*-----------------------------------------------------------*/\r
312 \r
313 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,\r
314                                           uint16_t subscribePacketIdentifier,\r
315                                           const IotMqttSubscription_t * pSubscriptionList,\r
316                                           size_t subscriptionCount )\r
317 {\r
318     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
319     size_t i = 0;\r
320     _mqttSubscription_t * pNewSubscription = NULL;\r
321     IotLink_t * pSubscriptionLink = NULL;\r
322     _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };\r
323 \r
324     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
325 \r
326     for( i = 0; i < subscriptionCount; i++ )\r
327     {\r
328         /* Check if this topic filter is already registered. */\r
329         topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
330         topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
331         pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
332                                                           NULL,\r
333                                                           _topicMatch,\r
334                                                           &topicMatchParams );\r
335 \r
336         if( pSubscriptionLink != NULL )\r
337         {\r
338             pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
339 \r
340             /* The lengths of exactly matching topic filters must match. */\r
341             IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );\r
342 \r
343             /* Replace the callback and packet info with the new parameters. */\r
344             pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
345             pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
346             pNewSubscription->packetInfo.order = i;\r
347         }\r
348         else\r
349         {\r
350             /* Allocate memory for a new subscription. */\r
351             pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +\r
352                                                            pSubscriptionList[ i ].topicFilterLength );\r
353 \r
354             if( pNewSubscription == NULL )\r
355             {\r
356                 status = IOT_MQTT_NO_MEMORY;\r
357                 break;\r
358             }\r
359             else\r
360             {\r
361                 /* Clear the new subscription. */\r
362                 ( void ) memset( pNewSubscription,\r
363                                  0x00,\r
364                                  sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );\r
365 \r
366                 /* Set the members of the new subscription and add it to the list. */\r
367                 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;\r
368                 pNewSubscription->packetInfo.order = i;\r
369                 pNewSubscription->callback = pSubscriptionList[ i ].callback;\r
370                 pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;\r
371                 ( void ) memcpy( pNewSubscription->pTopicFilter,\r
372                                  pSubscriptionList[ i ].pTopicFilter,\r
373                                  ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );\r
374 \r
375                 IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),\r
376                                           &( pNewSubscription->link ) );\r
377             }\r
378         }\r
379     }\r
380 \r
381     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
382 \r
383     /* If memory allocation failed, remove all previously added subscriptions. */\r
384     if( status != IOT_MQTT_SUCCESS )\r
385     {\r
386         _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,\r
387                                                   pSubscriptionList,\r
388                                                   i );\r
389     }\r
390     else\r
391     {\r
392         EMPTY_ELSE_MARKER;\r
393     }\r
394 \r
395     return status;\r
396 }\r
397 \r
398 /*-----------------------------------------------------------*/\r
399 \r
400 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,\r
401                                           IotMqttCallbackParam_t * pCallbackParam )\r
402 {\r
403     _mqttSubscription_t * pSubscription = NULL;\r
404     IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;\r
405     void * pCallbackContext = NULL;\r
406 \r
407     void ( * callbackFunction )( void *,\r
408                                  IotMqttCallbackParam_t * ) = NULL;\r
409     _topicMatchParams_t topicMatchParams =\r
410     {\r
411         .pTopicName      = pCallbackParam->u.message.info.pTopicName,\r
412         .topicNameLength = pCallbackParam->u.message.info.topicNameLength,\r
413         .exactMatchOnly  = false\r
414     };\r
415 \r
416     /* Prevent any other thread from modifying the subscription list while this\r
417      * function is searching. */\r
418     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
419 \r
420     /* Search the subscription list for all matching subscriptions starting at\r
421      * the list head. */\r
422     while( true )\r
423     {\r
424         pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
425                                                      pCurrentLink,\r
426                                                      _topicMatch,\r
427                                                      &topicMatchParams );\r
428 \r
429         /* No subscription found. Exit loop. */\r
430         if( pCurrentLink == NULL )\r
431         {\r
432             break;\r
433         }\r
434         else\r
435         {\r
436             EMPTY_ELSE_MARKER;\r
437         }\r
438 \r
439         /* Subscription found. Calculate pointer to subscription object. */\r
440         pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );\r
441 \r
442         /* Subscription validation should not have allowed a NULL callback function. */\r
443         IotMqtt_Assert( pSubscription->callback.function != NULL );\r
444 \r
445         /* Increment the subscription's reference count. */\r
446         ( pSubscription->references )++;\r
447 \r
448         /* Copy the necessary members of the subscription before releasing the\r
449          * subscription list mutex. */\r
450         pCallbackContext = pSubscription->callback.pCallbackContext;\r
451         callbackFunction = pSubscription->callback.function;\r
452 \r
453         /* Unlock the subscription list mutex. */\r
454         IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
455 \r
456         /* Set the members of the callback parameter. */\r
457         pCallbackParam->mqttConnection = pMqttConnection;\r
458         pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;\r
459         pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;\r
460 \r
461         /* Invoke the subscription callback. */\r
462         callbackFunction( pCallbackContext, pCallbackParam );\r
463 \r
464         /* Lock the subscription list mutex to decrement the reference count. */\r
465         IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
466 \r
467         /* Decrement the reference count. It must still be positive. */\r
468         ( pSubscription->references )--;\r
469         IotMqtt_Assert( pSubscription->references >= 0 );\r
470 \r
471         /* Save the pointer to the next link in case this subscription is freed. */\r
472         pNextLink = pCurrentLink->pNext;\r
473 \r
474         /* Remove this subscription if it has no references and the unsubscribed\r
475          * flag is set. */\r
476         if( pSubscription->unsubscribed == true )\r
477         {\r
478             /* An unsubscribed subscription should have been removed from the list. */\r
479             IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );\r
480 \r
481             /* Free subscriptions with no references. */\r
482             if( pSubscription->references == 0 )\r
483             {\r
484                 IotMqtt_FreeSubscription( pSubscription );\r
485             }\r
486             else\r
487             {\r
488                 EMPTY_ELSE_MARKER;\r
489             }\r
490         }\r
491         else\r
492         {\r
493             EMPTY_ELSE_MARKER;\r
494         }\r
495 \r
496         /* Move current link pointer. */\r
497         pCurrentLink = pNextLink;\r
498     }\r
499 \r
500     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
501 \r
502     _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
503 }\r
504 \r
505 /*-----------------------------------------------------------*/\r
506 \r
507 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,\r
508                                           uint16_t packetIdentifier,\r
509                                           int32_t order )\r
510 {\r
511     const _packetMatchParams_t packetMatchParams =\r
512     {\r
513         .packetIdentifier = packetIdentifier,\r
514         .order            = order\r
515     };\r
516 \r
517     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
518     IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
519                                     _packetMatch,\r
520                                     ( void * ) ( &packetMatchParams ),\r
521                                     IotMqtt_FreeSubscription,\r
522                                     offsetof( _mqttSubscription_t, link ) );\r
523     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
524 }\r
525 \r
526 /*-----------------------------------------------------------*/\r
527 \r
528 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,\r
529                                                const IotMqttSubscription_t * pSubscriptionList,\r
530                                                size_t subscriptionCount )\r
531 {\r
532     size_t i = 0;\r
533     _mqttSubscription_t * pSubscription = NULL;\r
534     IotLink_t * pSubscriptionLink = NULL;\r
535     _topicMatchParams_t topicMatchParams = { 0 };\r
536 \r
537     /* Prevent any other thread from modifying the subscription list while this\r
538      * function is running. */\r
539     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
540 \r
541     /* Find and remove each topic filter from the list. */\r
542     for( i = 0; i < subscriptionCount; i++ )\r
543     {\r
544         topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;\r
545         topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;\r
546         topicMatchParams.exactMatchOnly = true;\r
547 \r
548         pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),\r
549                                                           NULL,\r
550                                                           _topicMatch,\r
551                                                           &topicMatchParams );\r
552 \r
553         if( pSubscriptionLink != NULL )\r
554         {\r
555             pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
556 \r
557             /* Reference count must not be negative. */\r
558             IotMqtt_Assert( pSubscription->references >= 0 );\r
559 \r
560             /* Remove subscription from list. */\r
561             IotListDouble_Remove( pSubscriptionLink );\r
562 \r
563             /* Check the reference count. This subscription cannot be removed if\r
564              * there are subscription callbacks using it. */\r
565             if( pSubscription->references > 0 )\r
566             {\r
567                 /* Set the unsubscribed flag. The last active subscription callback\r
568                  * will remove and clean up this subscription. */\r
569                 pSubscription->unsubscribed = true;\r
570             }\r
571             else\r
572             {\r
573                 /* Free a subscription with no references. */\r
574                 IotMqtt_FreeSubscription( pSubscription );\r
575             }\r
576         }\r
577         else\r
578         {\r
579             EMPTY_ELSE_MARKER;\r
580         }\r
581     }\r
582 \r
583     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
584 }\r
585 \r
586 /*-----------------------------------------------------------*/\r
587 \r
588 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,\r
589                            const char * pTopicFilter,\r
590                            uint16_t topicFilterLength,\r
591                            IotMqttSubscription_t * pCurrentSubscription )\r
592 {\r
593     bool status = false;\r
594     _mqttSubscription_t * pSubscription = NULL;\r
595     IotLink_t * pSubscriptionLink = NULL;\r
596     _topicMatchParams_t topicMatchParams =\r
597     {\r
598         .pTopicName      = pTopicFilter,\r
599         .topicNameLength = topicFilterLength,\r
600         .exactMatchOnly  = true\r
601     };\r
602 \r
603     /* Prevent any other thread from modifying the subscription list while this\r
604      * function is running. */\r
605     IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );\r
606 \r
607     /* Search for a matching subscription. */\r
608     pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),\r
609                                                       NULL,\r
610                                                       _topicMatch,\r
611                                                       &topicMatchParams );\r
612 \r
613     /* Check if a matching subscription was found. */\r
614     if( pSubscriptionLink != NULL )\r
615     {\r
616         pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );\r
617 \r
618         /* Copy the matching subscription to the output parameter. */\r
619         if( pCurrentSubscription != NULL )\r
620         {\r
621             pCurrentSubscription->pTopicFilter = pTopicFilter;\r
622             pCurrentSubscription->topicFilterLength = topicFilterLength;\r
623             pCurrentSubscription->qos = IOT_MQTT_QOS_0;\r
624             pCurrentSubscription->callback = pSubscription->callback;\r
625         }\r
626         else\r
627         {\r
628             EMPTY_ELSE_MARKER;\r
629         }\r
630 \r
631         status = true;\r
632     }\r
633     else\r
634     {\r
635         EMPTY_ELSE_MARKER;\r
636     }\r
637 \r
638     IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );\r
639 \r
640     return status;\r
641 }\r
642 \r
643 /*-----------------------------------------------------------*/\r
644 \r
645 /* Provide access to internal functions and variables if testing. */\r
646 #if IOT_BUILD_TESTS == 1\r
647     #include "iot_test_access_mqtt_subscription.c"\r
648 #endif\r