2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_subscription.c
\r
28 * @brief Implements functions that manage subscriptions for an MQTT connection.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
35 #include <stdbool.h>
\r
38 /* Error handling include. */
\r
39 #include "private/iot_error.h"
\r
41 /* MQTT internal include. */
\r
42 #include "private/iot_mqtt_internal.h"
\r
44 /* Platform layer includes. */
\r
45 #include "platform/iot_threads.h"
\r
47 /*-----------------------------------------------------------*/
\r
50 * @brief First parameter to #_topicMatch.
\r
52 typedef struct _topicMatchParams
\r
54 const char * pTopicName; /**< @brief The topic name to parse. */
\r
55 uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */
\r
56 bool exactMatchOnly; /**< @brief Whether to allow wildcards or require exact matches. */
\r
57 } _topicMatchParams_t;
\r
60 * @brief First parameter to #_packetMatch.
\r
62 typedef struct _packetMatchParams
\r
64 uint16_t packetIdentifier; /**< Packet identifier to match. */
\r
65 int32_t order; /**< Order to match. Set to `-1` to ignore. */
\r
66 } _packetMatchParams_t;
\r
68 /*-----------------------------------------------------------*/
\r
71 * @brief Matches a topic name (from a publish) with a topic filter (from a
\r
74 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
75 * @param[in] pMatch Pointer to a #_topicMatchParams_t.
\r
77 * @return `true` if the arguments match the subscription topic filter; `false`
\r
80 static bool _topicMatch( const IotLink_t * pSubscriptionLink,
\r
84 * @brief Matches a packet identifier and order.
\r
86 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
87 * @param[in] pMatch Pointer to a #_packetMatchParams_t.
\r
89 * @return `true` if the arguments match the subscription's packet info; `false`
\r
92 static bool _packetMatch( const IotLink_t * pSubscriptionLink,
\r
95 /*-----------------------------------------------------------*/
\r
97 static bool _topicMatch( const IotLink_t * pSubscriptionLink,
\r
100 IOT_FUNCTION_ENTRY( bool, false );
\r
101 uint16_t nameIndex = 0, filterIndex = 0;
\r
103 /* Because this function is called from a container function, the given link
\r
104 * must never be NULL. */
\r
105 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
107 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
110 _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;
\r
112 /* Extract the relevant strings and lengths from parameters. */
\r
113 const char * pTopicName = pParam->pTopicName;
\r
114 const char * pTopicFilter = pSubscription->pTopicFilter;
\r
115 const uint16_t topicNameLength = pParam->topicNameLength;
\r
116 const uint16_t topicFilterLength = pSubscription->topicFilterLength;
\r
118 /* Check for an exact match. */
\r
119 if( topicNameLength == topicFilterLength )
\r
121 status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );
\r
123 IOT_GOTO_CLEANUP();
\r
130 /* If the topic lengths are different but an exact match is required, return
\r
132 if( pParam->exactMatchOnly == true )
\r
134 IOT_SET_AND_GOTO_CLEANUP( false );
\r
141 while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
\r
143 /* Check if the character in the topic name matches the corresponding
\r
144 * character in the topic filter string. */
\r
145 if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
\r
147 /* Handle special corner cases as documented by the MQTT protocol spec. */
\r
149 /* Filter "sport/#" also matches "sport" since # includes the parent level. */
\r
150 if( nameIndex == topicNameLength - 1 )
\r
152 if( filterIndex == topicFilterLength - 3 )
\r
154 if( pTopicFilter[ filterIndex + 1 ] == '/' )
\r
156 if( pTopicFilter[ filterIndex + 2 ] == '#' )
\r
158 IOT_SET_AND_GOTO_CLEANUP( true );
\r
180 /* Filter "sport/+" also matches the "sport/" but not "sport". */
\r
181 if( nameIndex == topicNameLength - 1 )
\r
183 if( filterIndex == topicFilterLength - 2 )
\r
185 if( pTopicFilter[ filterIndex + 1 ] == '+' )
\r
187 IOT_SET_AND_GOTO_CLEANUP( true );
\r
206 /* Check for wildcards. */
\r
207 if( pTopicFilter[ filterIndex ] == '+' )
\r
209 /* Move topic name index to the end of the current level.
\r
210 * This is identified by '/'. */
\r
211 while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )
\r
216 /* Increment filter index to skip '/'. */
\r
220 else if( pTopicFilter[ filterIndex ] == '#' )
\r
222 /* Subsequent characters don't need to be checked if the for the
\r
223 * multi-level wildcard. */
\r
224 IOT_SET_AND_GOTO_CLEANUP( true );
\r
228 /* Any character mismatch other than '+' or '#' means the topic
\r
229 * name does not match the topic filter. */
\r
230 IOT_SET_AND_GOTO_CLEANUP( false );
\r
234 /* Increment indexes. */
\r
239 /* If the end of both strings has been reached, they match. */
\r
240 if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )
\r
242 IOT_SET_AND_GOTO_CLEANUP( true );
\r
249 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
252 /*-----------------------------------------------------------*/
\r
254 static bool _packetMatch( const IotLink_t * pSubscriptionLink,
\r
257 bool match = false;
\r
259 /* Because this function is called from a container function, the given link
\r
260 * must never be NULL. */
\r
261 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
263 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
266 _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;
\r
268 /* Compare packet identifiers. */
\r
269 if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )
\r
271 /* Compare orders if order is not -1. */
\r
272 if( pParam->order == -1 )
\r
278 match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;
\r
282 /* If this subscription should be removed, check the reference count. */
\r
283 if( match == true )
\r
285 /* Reference count must not be negative. */
\r
286 IotMqtt_Assert( pSubscription->references >= 0 );
\r
288 /* If the reference count is positive, this subscription cannot be
\r
289 * removed yet because there are subscription callbacks using it. */
\r
290 if( pSubscription->references > 0 )
\r
294 /* Set the unsubscribed flag. The last active subscription callback
\r
295 * will remove and clean up this subscription. */
\r
296 pSubscription->unsubscribed = true;
\r
311 /*-----------------------------------------------------------*/
\r
313 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
\r
314 uint16_t subscribePacketIdentifier,
\r
315 const IotMqttSubscription_t * pSubscriptionList,
\r
316 size_t subscriptionCount )
\r
318 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
320 _mqttSubscription_t * pNewSubscription = NULL;
\r
321 IotLink_t * pSubscriptionLink = NULL;
\r
322 _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };
\r
324 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
326 for( i = 0; i < subscriptionCount; i++ )
\r
328 /* Check if this topic filter is already registered. */
\r
329 topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
\r
330 topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
\r
331 pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
334 &topicMatchParams );
\r
336 if( pSubscriptionLink != NULL )
\r
338 pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
340 /* The lengths of exactly matching topic filters must match. */
\r
341 IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );
\r
343 /* Replace the callback and packet info with the new parameters. */
\r
344 pNewSubscription->callback = pSubscriptionList[ i ].callback;
\r
345 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
\r
346 pNewSubscription->packetInfo.order = i;
\r
350 /* Allocate memory for a new subscription. */
\r
351 pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +
\r
352 pSubscriptionList[ i ].topicFilterLength );
\r
354 if( pNewSubscription == NULL )
\r
356 status = IOT_MQTT_NO_MEMORY;
\r
361 /* Clear the new subscription. */
\r
362 ( void ) memset( pNewSubscription,
\r
364 sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );
\r
366 /* Set the members of the new subscription and add it to the list. */
\r
367 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
\r
368 pNewSubscription->packetInfo.order = i;
\r
369 pNewSubscription->callback = pSubscriptionList[ i ].callback;
\r
370 pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;
\r
371 ( void ) memcpy( pNewSubscription->pTopicFilter,
\r
372 pSubscriptionList[ i ].pTopicFilter,
\r
373 ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );
\r
375 IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),
\r
376 &( pNewSubscription->link ) );
\r
381 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
383 /* If memory allocation failed, remove all previously added subscriptions. */
\r
384 if( status != IOT_MQTT_SUCCESS )
\r
386 _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,
\r
398 /*-----------------------------------------------------------*/
\r
400 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
\r
401 IotMqttCallbackParam_t * pCallbackParam )
\r
403 _mqttSubscription_t * pSubscription = NULL;
\r
404 IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;
\r
405 void * pCallbackContext = NULL;
\r
407 void ( * callbackFunction )( void *,
\r
408 IotMqttCallbackParam_t * ) = NULL;
\r
409 _topicMatchParams_t topicMatchParams =
\r
411 .pTopicName = pCallbackParam->u.message.info.pTopicName,
\r
412 .topicNameLength = pCallbackParam->u.message.info.topicNameLength,
\r
413 .exactMatchOnly = false
\r
416 /* Prevent any other thread from modifying the subscription list while this
\r
417 * function is searching. */
\r
418 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
420 /* Search the subscription list for all matching subscriptions starting at
\r
421 * the list head. */
\r
424 pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
427 &topicMatchParams );
\r
429 /* No subscription found. Exit loop. */
\r
430 if( pCurrentLink == NULL )
\r
439 /* Subscription found. Calculate pointer to subscription object. */
\r
440 pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );
\r
442 /* Subscription validation should not have allowed a NULL callback function. */
\r
443 IotMqtt_Assert( pSubscription->callback.function != NULL );
\r
445 /* Increment the subscription's reference count. */
\r
446 ( pSubscription->references )++;
\r
448 /* Copy the necessary members of the subscription before releasing the
\r
449 * subscription list mutex. */
\r
450 pCallbackContext = pSubscription->callback.pCallbackContext;
\r
451 callbackFunction = pSubscription->callback.function;
\r
453 /* Unlock the subscription list mutex. */
\r
454 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
456 /* Set the members of the callback parameter. */
\r
457 pCallbackParam->mqttConnection = pMqttConnection;
\r
458 pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;
\r
459 pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;
\r
461 /* Invoke the subscription callback. */
\r
462 callbackFunction( pCallbackContext, pCallbackParam );
\r
464 /* Lock the subscription list mutex to decrement the reference count. */
\r
465 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
467 /* Decrement the reference count. It must still be positive. */
\r
468 ( pSubscription->references )--;
\r
469 IotMqtt_Assert( pSubscription->references >= 0 );
\r
471 /* Save the pointer to the next link in case this subscription is freed. */
\r
472 pNextLink = pCurrentLink->pNext;
\r
474 /* Remove this subscription if it has no references and the unsubscribed
\r
476 if( pSubscription->unsubscribed == true )
\r
478 /* An unsubscribed subscription should have been removed from the list. */
\r
479 IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );
\r
481 /* Free subscriptions with no references. */
\r
482 if( pSubscription->references == 0 )
\r
484 IotMqtt_FreeSubscription( pSubscription );
\r
496 /* Move current link pointer. */
\r
497 pCurrentLink = pNextLink;
\r
500 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
502 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
505 /*-----------------------------------------------------------*/
\r
507 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
\r
508 uint16_t packetIdentifier,
\r
511 const _packetMatchParams_t packetMatchParams =
\r
513 .packetIdentifier = packetIdentifier,
\r
517 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
518 IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
\r
520 ( void * ) ( &packetMatchParams ),
\r
521 IotMqtt_FreeSubscription,
\r
522 offsetof( _mqttSubscription_t, link ) );
\r
523 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
526 /*-----------------------------------------------------------*/
\r
528 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
\r
529 const IotMqttSubscription_t * pSubscriptionList,
\r
530 size_t subscriptionCount )
\r
533 _mqttSubscription_t * pSubscription = NULL;
\r
534 IotLink_t * pSubscriptionLink = NULL;
\r
535 _topicMatchParams_t topicMatchParams = { 0 };
\r
537 /* Prevent any other thread from modifying the subscription list while this
\r
538 * function is running. */
\r
539 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
541 /* Find and remove each topic filter from the list. */
\r
542 for( i = 0; i < subscriptionCount; i++ )
\r
544 topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
\r
545 topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
\r
546 topicMatchParams.exactMatchOnly = true;
\r
548 pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
551 &topicMatchParams );
\r
553 if( pSubscriptionLink != NULL )
\r
555 pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
557 /* Reference count must not be negative. */
\r
558 IotMqtt_Assert( pSubscription->references >= 0 );
\r
560 /* Remove subscription from list. */
\r
561 IotListDouble_Remove( pSubscriptionLink );
\r
563 /* Check the reference count. This subscription cannot be removed if
\r
564 * there are subscription callbacks using it. */
\r
565 if( pSubscription->references > 0 )
\r
567 /* Set the unsubscribed flag. The last active subscription callback
\r
568 * will remove and clean up this subscription. */
\r
569 pSubscription->unsubscribed = true;
\r
573 /* Free a subscription with no references. */
\r
574 IotMqtt_FreeSubscription( pSubscription );
\r
583 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
586 /*-----------------------------------------------------------*/
\r
588 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
\r
589 const char * pTopicFilter,
\r
590 uint16_t topicFilterLength,
\r
591 IotMqttSubscription_t * pCurrentSubscription )
\r
593 bool status = false;
\r
594 _mqttSubscription_t * pSubscription = NULL;
\r
595 IotLink_t * pSubscriptionLink = NULL;
\r
596 _topicMatchParams_t topicMatchParams =
\r
598 .pTopicName = pTopicFilter,
\r
599 .topicNameLength = topicFilterLength,
\r
600 .exactMatchOnly = true
\r
603 /* Prevent any other thread from modifying the subscription list while this
\r
604 * function is running. */
\r
605 IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );
\r
607 /* Search for a matching subscription. */
\r
608 pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),
\r
611 &topicMatchParams );
\r
613 /* Check if a matching subscription was found. */
\r
614 if( pSubscriptionLink != NULL )
\r
616 pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
618 /* Copy the matching subscription to the output parameter. */
\r
619 if( pCurrentSubscription != NULL )
\r
621 pCurrentSubscription->pTopicFilter = pTopicFilter;
\r
622 pCurrentSubscription->topicFilterLength = topicFilterLength;
\r
623 pCurrentSubscription->qos = IOT_MQTT_QOS_0;
\r
624 pCurrentSubscription->callback = pSubscription->callback;
\r
638 IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );
\r
643 /*-----------------------------------------------------------*/
\r
645 /* Provide access to internal functions and variables if testing. */
\r
646 #if IOT_BUILD_TESTS == 1
\r
647 #include "iot_test_access_mqtt_subscription.c"
\r