3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
24 * @file iot_mqtt_subscription.c
\r
25 * @brief Implements functions that manage subscriptions for an MQTT connection.
\r
28 /* The config header is always included first. */
\r
29 #include "iot_config.h"
\r
31 /* Standard includes. */
\r
32 #include <stdbool.h>
\r
35 /* Error handling include. */
\r
36 #include "iot_error.h"
\r
38 /* MQTT internal include. */
\r
39 #include "private/iot_mqtt_internal.h"
\r
41 /* Platform layer includes. */
\r
42 #include "platform/iot_threads.h"
\r
44 /*-----------------------------------------------------------*/
\r
47 * @brief First parameter to #_topicMatch.
\r
49 typedef struct _topicMatchParams
\r
51 const char * pTopicName; /**< @brief The topic name to parse. */
\r
52 uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */
\r
53 bool exactMatchOnly; /**< @brief Whether to allow wildcards or require exact matches. */
\r
54 } _topicMatchParams_t;
\r
57 * @brief First parameter to #_packetMatch.
\r
59 typedef struct _packetMatchParams
\r
61 uint16_t packetIdentifier; /**< Packet identifier to match. */
\r
62 int32_t order; /**< Order to match. Set to #MQTT_REMOVE_ALL_SUBSCRIPTIONS to ignore. */
\r
63 } _packetMatchParams_t;
\r
65 /*-----------------------------------------------------------*/
\r
68 * @brief Matches a topic name (from a publish) with a topic filter (from a
\r
71 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
72 * @param[in] pMatch Pointer to a #_topicMatchParams_t.
\r
74 * @return `true` if the arguments match the subscription topic filter; `false`
\r
77 static bool _topicMatch( const IotLink_t * pSubscriptionLink,
\r
81 * @brief Matches a packet identifier and order.
\r
83 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
84 * @param[in] pMatch Pointer to a #_packetMatchParams_t.
\r
86 * @return `true` if the arguments match the subscription's packet info; `false`
\r
89 static bool _packetMatch( const IotLink_t * pSubscriptionLink,
\r
92 /*-----------------------------------------------------------*/
\r
94 static bool _topicMatch( const IotLink_t * pSubscriptionLink,
\r
97 IOT_FUNCTION_ENTRY( bool, false );
\r
98 uint16_t nameIndex = 0, filterIndex = 0;
\r
100 /* Because this function is called from a container function, the given link
\r
101 * must never be NULL. */
\r
102 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
104 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
107 _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;
\r
109 /* Extract the relevant strings and lengths from parameters. */
\r
110 const char * pTopicName = pParam->pTopicName;
\r
111 const char * pTopicFilter = pSubscription->pTopicFilter;
\r
112 const uint16_t topicNameLength = pParam->topicNameLength;
\r
113 const uint16_t topicFilterLength = pSubscription->topicFilterLength;
\r
115 /* Check for an exact match. */
\r
116 if( topicNameLength == topicFilterLength )
\r
118 status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );
\r
120 IOT_GOTO_CLEANUP();
\r
127 /* If the topic lengths are different but an exact match is required, return
\r
129 if( pParam->exactMatchOnly == true )
\r
131 IOT_SET_AND_GOTO_CLEANUP( false );
\r
138 while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
\r
140 /* Check if the character in the topic name matches the corresponding
\r
141 * character in the topic filter string. */
\r
142 if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
\r
144 /* Handle special corner cases as documented by the MQTT protocol spec. */
\r
146 /* Filter "sport/#" also matches "sport" since # includes the parent level. */
\r
147 if( nameIndex == topicNameLength - 1 )
\r
149 if( filterIndex == topicFilterLength - 3 )
\r
151 if( pTopicFilter[ filterIndex + 1 ] == '/' )
\r
153 if( pTopicFilter[ filterIndex + 2 ] == '#' )
\r
155 IOT_SET_AND_GOTO_CLEANUP( true );
\r
177 /* Filter "sport/+" also matches the "sport/" but not "sport". */
\r
178 if( nameIndex == topicNameLength - 1 )
\r
180 if( filterIndex == topicFilterLength - 2 )
\r
182 if( pTopicFilter[ filterIndex + 1 ] == '+' )
\r
184 IOT_SET_AND_GOTO_CLEANUP( true );
\r
203 /* Check for wildcards. */
\r
204 if( pTopicFilter[ filterIndex ] == '+' )
\r
206 /* Move topic name index to the end of the current level.
\r
207 * This is identified by '/'. */
\r
208 while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )
\r
213 /* Increment filter index to skip '/'. */
\r
217 else if( pTopicFilter[ filterIndex ] == '#' )
\r
219 /* Subsequent characters don't need to be checked if the for the
\r
220 * multi-level wildcard. */
\r
221 IOT_SET_AND_GOTO_CLEANUP( true );
\r
225 /* Any character mismatch other than '+' or '#' means the topic
\r
226 * name does not match the topic filter. */
\r
227 IOT_SET_AND_GOTO_CLEANUP( false );
\r
231 /* Increment indexes. */
\r
236 /* If the end of both strings has been reached, they match. */
\r
237 if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )
\r
239 IOT_SET_AND_GOTO_CLEANUP( true );
\r
246 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
249 /*-----------------------------------------------------------*/
\r
251 static bool _packetMatch( const IotLink_t * pSubscriptionLink,
\r
254 bool match = false;
\r
256 /* Because this function is called from a container function, the given link
\r
257 * must never be NULL. */
\r
258 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
260 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
263 _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;
\r
265 /* Compare packet identifiers. */
\r
266 if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )
\r
268 /* Compare orders if order is not MQTT_REMOVE_ALL_SUBSCRIPTIONS. */
\r
269 if( pParam->order == MQTT_REMOVE_ALL_SUBSCRIPTIONS )
\r
275 match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;
\r
279 /* If this subscription should be removed, check the reference count. */
\r
280 if( match == true )
\r
282 /* Reference count must not be negative. */
\r
283 IotMqtt_Assert( pSubscription->references >= 0 );
\r
285 /* If the reference count is positive, this subscription cannot be
\r
286 * removed yet because there are subscription callbacks using it. */
\r
287 if( pSubscription->references > 0 )
\r
291 /* Set the unsubscribed flag. The last active subscription callback
\r
292 * will remove and clean up this subscription. */
\r
293 pSubscription->unsubscribed = true;
\r
308 /*-----------------------------------------------------------*/
\r
310 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
\r
311 uint16_t subscribePacketIdentifier,
\r
312 const IotMqttSubscription_t * pSubscriptionList,
\r
313 size_t subscriptionCount )
\r
315 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
317 _mqttSubscription_t * pNewSubscription = NULL;
\r
318 IotLink_t * pSubscriptionLink = NULL;
\r
319 _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };
\r
321 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
323 for( i = 0; i < subscriptionCount; i++ )
\r
325 /* Check if this topic filter is already registered. */
\r
326 topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
\r
327 topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
\r
328 pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
331 &topicMatchParams );
\r
333 if( pSubscriptionLink != NULL )
\r
335 pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
337 /* The lengths of exactly matching topic filters must match. */
\r
338 IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );
\r
340 /* Replace the callback and packet info with the new parameters. */
\r
341 pNewSubscription->callback = pSubscriptionList[ i ].callback;
\r
342 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
\r
343 pNewSubscription->packetInfo.order = i;
\r
347 /* Allocate memory for a new subscription. */
\r
348 pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +
\r
349 pSubscriptionList[ i ].topicFilterLength );
\r
351 if( pNewSubscription == NULL )
\r
353 status = IOT_MQTT_NO_MEMORY;
\r
358 /* Clear the new subscription. */
\r
359 ( void ) memset( pNewSubscription,
\r
361 sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );
\r
363 /* Set the members of the new subscription and add it to the list. */
\r
364 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
\r
365 pNewSubscription->packetInfo.order = i;
\r
366 pNewSubscription->callback = pSubscriptionList[ i ].callback;
\r
367 pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;
\r
368 ( void ) memcpy( pNewSubscription->pTopicFilter,
\r
369 pSubscriptionList[ i ].pTopicFilter,
\r
370 ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );
\r
372 IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),
\r
373 &( pNewSubscription->link ) );
\r
378 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
380 /* If memory allocation failed, remove all previously added subscriptions. */
\r
381 if( status != IOT_MQTT_SUCCESS )
\r
383 _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,
\r
395 /*-----------------------------------------------------------*/
\r
397 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
\r
398 IotMqttCallbackParam_t * pCallbackParam )
\r
400 _mqttSubscription_t * pSubscription = NULL;
\r
401 IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;
\r
402 void * pCallbackContext = NULL;
\r
404 void ( * callbackFunction )( void *,
\r
405 IotMqttCallbackParam_t * ) = NULL;
\r
406 _topicMatchParams_t topicMatchParams = { 0 };
\r
408 /* Set the members of the search parameter. */
\r
409 topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;
\r
410 topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;
\r
411 topicMatchParams.exactMatchOnly = false;
\r
413 /* Prevent any other thread from modifying the subscription list while this
\r
414 * function is searching. */
\r
415 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
417 /* Search the subscription list for all matching subscriptions starting at
\r
418 * the list head. */
\r
421 pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
424 &topicMatchParams );
\r
426 /* No subscription found. Exit loop. */
\r
427 if( pCurrentLink == NULL )
\r
436 /* Subscription found. Calculate pointer to subscription object. */
\r
437 pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );
\r
439 /* Subscription validation should not have allowed a NULL callback function. */
\r
440 IotMqtt_Assert( pSubscription->callback.function != NULL );
\r
442 /* Increment the subscription's reference count. */
\r
443 ( pSubscription->references )++;
\r
445 /* Copy the necessary members of the subscription before releasing the
\r
446 * subscription list mutex. */
\r
447 pCallbackContext = pSubscription->callback.pCallbackContext;
\r
448 callbackFunction = pSubscription->callback.function;
\r
450 /* Unlock the subscription list mutex. */
\r
451 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
453 /* Set the members of the callback parameter. */
\r
454 pCallbackParam->mqttConnection = pMqttConnection;
\r
455 pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;
\r
456 pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;
\r
458 /* Invoke the subscription callback. */
\r
459 callbackFunction( pCallbackContext, pCallbackParam );
\r
461 /* Lock the subscription list mutex to decrement the reference count. */
\r
462 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
464 /* Decrement the reference count. It must still be positive. */
\r
465 ( pSubscription->references )--;
\r
466 IotMqtt_Assert( pSubscription->references >= 0 );
\r
468 /* Save the pointer to the next link in case this subscription is freed. */
\r
469 pNextLink = pCurrentLink->pNext;
\r
471 /* Remove this subscription if it has no references and the unsubscribed
\r
473 if( pSubscription->unsubscribed == true )
\r
475 /* An unsubscribed subscription should have been removed from the list. */
\r
476 IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );
\r
478 /* Free subscriptions with no references. */
\r
479 if( pSubscription->references == 0 )
\r
481 IotMqtt_FreeSubscription( pSubscription );
\r
493 /* Move current link pointer. */
\r
494 pCurrentLink = pNextLink;
\r
497 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
499 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
502 /*-----------------------------------------------------------*/
\r
504 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
\r
505 uint16_t packetIdentifier,
\r
508 _packetMatchParams_t packetMatchParams = { 0 };
\r
510 /* Set the members of the search parameter. */
\r
511 packetMatchParams.packetIdentifier = packetIdentifier;
\r
512 packetMatchParams.order = order;
\r
514 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
515 IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
\r
517 ( void * ) ( &packetMatchParams ),
\r
518 IotMqtt_FreeSubscription,
\r
519 offsetof( _mqttSubscription_t, link ) );
\r
520 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
523 /*-----------------------------------------------------------*/
\r
525 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
\r
526 const IotMqttSubscription_t * pSubscriptionList,
\r
527 size_t subscriptionCount )
\r
530 _mqttSubscription_t * pSubscription = NULL;
\r
531 IotLink_t * pSubscriptionLink = NULL;
\r
532 _topicMatchParams_t topicMatchParams = { 0 };
\r
534 /* Prevent any other thread from modifying the subscription list while this
\r
535 * function is running. */
\r
536 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
538 /* Find and remove each topic filter from the list. */
\r
539 for( i = 0; i < subscriptionCount; i++ )
\r
541 topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
\r
542 topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
\r
543 topicMatchParams.exactMatchOnly = true;
\r
545 pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
548 &topicMatchParams );
\r
550 if( pSubscriptionLink != NULL )
\r
552 pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
554 /* Reference count must not be negative. */
\r
555 IotMqtt_Assert( pSubscription->references >= 0 );
\r
557 /* Remove subscription from list. */
\r
558 IotListDouble_Remove( pSubscriptionLink );
\r
560 /* Check the reference count. This subscription cannot be removed if
\r
561 * there are subscription callbacks using it. */
\r
562 if( pSubscription->references > 0 )
\r
564 /* Set the unsubscribed flag. The last active subscription callback
\r
565 * will remove and clean up this subscription. */
\r
566 pSubscription->unsubscribed = true;
\r
570 /* Free a subscription with no references. */
\r
571 IotMqtt_FreeSubscription( pSubscription );
\r
580 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
583 /*-----------------------------------------------------------*/
\r
585 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
\r
586 const char * pTopicFilter,
\r
587 uint16_t topicFilterLength,
\r
588 IotMqttSubscription_t * const pCurrentSubscription )
\r
590 bool status = false;
\r
591 _mqttSubscription_t * pSubscription = NULL;
\r
592 IotLink_t * pSubscriptionLink = NULL;
\r
593 _topicMatchParams_t topicMatchParams = { 0 };
\r
595 /* Set the members of the search parameter. */
\r
596 topicMatchParams.pTopicName = pTopicFilter;
\r
597 topicMatchParams.topicNameLength = topicFilterLength;
\r
598 topicMatchParams.exactMatchOnly = true;
\r
600 /* Prevent any other thread from modifying the subscription list while this
\r
601 * function is running. */
\r
602 IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );
\r
604 /* Search for a matching subscription. */
\r
605 pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),
\r
608 &topicMatchParams );
\r
610 /* Check if a matching subscription was found. */
\r
611 if( pSubscriptionLink != NULL )
\r
613 pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
615 /* Copy the matching subscription to the output parameter. */
\r
616 if( pCurrentSubscription != NULL )
\r
618 pCurrentSubscription->pTopicFilter = pTopicFilter;
\r
619 pCurrentSubscription->topicFilterLength = topicFilterLength;
\r
620 pCurrentSubscription->qos = IOT_MQTT_QOS_0;
\r
621 pCurrentSubscription->callback = pSubscription->callback;
\r
635 IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );
\r
640 /*-----------------------------------------------------------*/
\r
642 /* Provide access to internal functions and variables if testing. */
\r
643 #if IOT_BUILD_TESTS == 1
\r
644 #include "iot_test_access_mqtt_subscription.c"
\r