2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_subscription.c
\r
28 * @brief Implements functions that manage subscriptions for an MQTT connection.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
35 #include <stdbool.h>
\r
38 /* Error handling include. */
\r
39 #include "private/iot_error.h"
\r
41 /* MQTT internal include. */
\r
42 #include "private/iot_mqtt_internal.h"
\r
44 /* Platform layer includes. */
\r
45 #include "platform/iot_threads.h"
\r
47 /*-----------------------------------------------------------*/
\r
50 * @brief First parameter to #_topicMatch.
\r
52 typedef struct _topicMatchParams
\r
54 const char * pTopicName; /**< @brief The topic name to parse. */
\r
55 uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */
\r
56 bool exactMatchOnly; /**< @brief Whether to allow wildcards or require exact matches. */
\r
57 } _topicMatchParams_t;
\r
60 * @brief First parameter to #_packetMatch.
\r
62 typedef struct _packetMatchParams
\r
64 uint16_t packetIdentifier; /**< Packet identifier to match. */
\r
65 int32_t order; /**< Order to match. Set to `-1` to ignore. */
\r
66 } _packetMatchParams_t;
\r
68 /*-----------------------------------------------------------*/
\r
71 * @brief Matches a topic name (from a publish) with a topic filter (from a
\r
74 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
75 * @param[in] pMatch Pointer to a #_topicMatchParams_t.
\r
77 * @return `true` if the arguments match the subscription topic filter; `false`
\r
80 static bool _topicMatch( const IotLink_t * pSubscriptionLink,
\r
84 * @brief Matches a packet identifier and order.
\r
86 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
87 * @param[in] pMatch Pointer to a #_packetMatchParams_t.
\r
89 * @return `true` if the arguments match the subscription's packet info; `false`
\r
92 static bool _packetMatch( const IotLink_t * pSubscriptionLink,
\r
95 /*-----------------------------------------------------------*/
\r
97 static bool _topicMatch( const IotLink_t * pSubscriptionLink,
\r
100 IOT_FUNCTION_ENTRY( bool, false );
\r
101 uint16_t nameIndex = 0, filterIndex = 0;
\r
103 /* Because this function is called from a container function, the given link
\r
104 * must never be NULL. */
\r
105 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
107 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
110 _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;
\r
112 /* Extract the relevant strings and lengths from parameters. */
\r
113 const char * pTopicName = pParam->pTopicName;
\r
114 const char * pTopicFilter = pSubscription->pTopicFilter;
\r
115 const uint16_t topicNameLength = pParam->topicNameLength;
\r
116 const uint16_t topicFilterLength = pSubscription->topicFilterLength;
\r
118 /* Check for an exact match. */
\r
119 if( topicNameLength == topicFilterLength )
\r
121 status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );
\r
123 IOT_GOTO_CLEANUP();
\r
130 /* If the topic lengths are different but an exact match is required, return
\r
132 if( pParam->exactMatchOnly == true )
\r
134 IOT_SET_AND_GOTO_CLEANUP( false );
\r
141 while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
\r
143 /* Check if the character in the topic name matches the corresponding
\r
144 * character in the topic filter string. */
\r
145 if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
\r
147 /* Handle special corner cases as documented by the MQTT protocol spec. */
\r
149 /* Filter "sport/#" also matches "sport" since # includes the parent level. */
\r
150 if( nameIndex == topicNameLength - 1 )
\r
152 if( filterIndex == topicFilterLength - 3 )
\r
154 if( pTopicFilter[ filterIndex + 1 ] == '/' )
\r
156 if( pTopicFilter[ filterIndex + 2 ] == '#' )
\r
158 IOT_SET_AND_GOTO_CLEANUP( true );
\r
180 /* Filter "sport/+" also matches the "sport/" but not "sport". */
\r
181 if( nameIndex == topicNameLength - 1 )
\r
183 if( filterIndex == topicFilterLength - 2 )
\r
185 if( pTopicFilter[ filterIndex + 1 ] == '+' )
\r
187 IOT_SET_AND_GOTO_CLEANUP( true );
\r
206 /* Check for wildcards. */
\r
207 if( pTopicFilter[ filterIndex ] == '+' )
\r
209 /* Move topic name index to the end of the current level.
\r
210 * This is identified by '/'. */
\r
211 while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )
\r
216 /* Increment filter index to skip '/'. */
\r
220 else if( pTopicFilter[ filterIndex ] == '#' )
\r
222 /* Subsequent characters don't need to be checked if the for the
\r
223 * multi-level wildcard. */
\r
224 IOT_SET_AND_GOTO_CLEANUP( true );
\r
228 /* Any character mismatch other than '+' or '#' means the topic
\r
229 * name does not match the topic filter. */
\r
230 IOT_SET_AND_GOTO_CLEANUP( false );
\r
234 /* Increment indexes. */
\r
239 /* If the end of both strings has been reached, they match. */
\r
240 if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )
\r
242 IOT_SET_AND_GOTO_CLEANUP( true );
\r
249 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
252 /*-----------------------------------------------------------*/
\r
254 static bool _packetMatch( const IotLink_t * pSubscriptionLink,
\r
257 bool match = false;
\r
259 /* Because this function is called from a container function, the given link
\r
260 * must never be NULL. */
\r
261 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
263 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
266 _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;
\r
268 /* Compare packet identifiers. */
\r
269 if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )
\r
271 /* Compare orders if order is not -1. */
\r
272 if( pParam->order == -1 )
\r
278 match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;
\r
282 /* If this subscription should be removed, check the reference count. */
\r
283 if( match == true )
\r
285 /* Reference count must not be negative. */
\r
286 IotMqtt_Assert( pSubscription->references >= 0 );
\r
288 /* If the reference count is positive, this subscription cannot be
\r
289 * removed yet because there are subscription callbacks using it. */
\r
290 if( pSubscription->references > 0 )
\r
294 /* Set the unsubscribed flag. The last active subscription callback
\r
295 * will remove and clean up this subscription. */
\r
296 pSubscription->unsubscribed = true;
\r
311 /*-----------------------------------------------------------*/
\r
313 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
\r
314 uint16_t subscribePacketIdentifier,
\r
315 const IotMqttSubscription_t * pSubscriptionList,
\r
316 size_t subscriptionCount )
\r
318 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
320 _mqttSubscription_t * pNewSubscription = NULL;
\r
321 IotLink_t * pSubscriptionLink = NULL;
\r
322 _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };
\r
324 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
326 for( i = 0; i < subscriptionCount; i++ )
\r
328 /* Check if this topic filter is already registered. */
\r
329 topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
\r
330 topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
\r
331 pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
334 &topicMatchParams );
\r
336 if( pSubscriptionLink != NULL )
\r
338 pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
340 /* The lengths of exactly matching topic filters must match. */
\r
341 IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );
\r
343 /* Replace the callback and packet info with the new parameters. */
\r
344 pNewSubscription->callback = pSubscriptionList[ i ].callback;
\r
345 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
\r
346 pNewSubscription->packetInfo.order = i;
\r
350 /* Allocate memory for a new subscription. */
\r
351 pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +
\r
352 pSubscriptionList[ i ].topicFilterLength );
\r
354 if( pNewSubscription == NULL )
\r
356 status = IOT_MQTT_NO_MEMORY;
\r
361 /* Clear the new subscription. */
\r
362 ( void ) memset( pNewSubscription,
\r
364 sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );
\r
366 /* Set the members of the new subscription and add it to the list. */
\r
367 pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
\r
368 pNewSubscription->packetInfo.order = i;
\r
369 pNewSubscription->callback = pSubscriptionList[ i ].callback;
\r
370 pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;
\r
371 ( void ) memcpy( pNewSubscription->pTopicFilter,
\r
372 pSubscriptionList[ i ].pTopicFilter,
\r
373 ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );
\r
375 IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),
\r
376 &( pNewSubscription->link ) );
\r
381 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
383 /* If memory allocation failed, remove all previously added subscriptions. */
\r
384 if( status != IOT_MQTT_SUCCESS )
\r
386 _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,
\r
398 /*-----------------------------------------------------------*/
\r
400 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
\r
401 IotMqttCallbackParam_t * pCallbackParam )
\r
403 _mqttSubscription_t * pSubscription = NULL;
\r
404 IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;
\r
405 void * pCallbackContext = NULL;
\r
407 void ( * callbackFunction )( void *,
\r
408 IotMqttCallbackParam_t * ) = NULL;
\r
409 _topicMatchParams_t topicMatchParams = { 0 };
\r
411 topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;
\r
412 topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;
\r
413 topicMatchParams.exactMatchOnly = false;
\r
415 /* Prevent any other thread from modifying the subscription list while this
\r
416 * function is searching. */
\r
417 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
419 /* Search the subscription list for all matching subscriptions starting at
\r
420 * the list head. */
\r
423 pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
426 &topicMatchParams );
\r
428 /* No subscription found. Exit loop. */
\r
429 if( pCurrentLink == NULL )
\r
438 /* Subscription found. Calculate pointer to subscription object. */
\r
439 pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );
\r
441 /* Subscription validation should not have allowed a NULL callback function. */
\r
442 IotMqtt_Assert( pSubscription->callback.function != NULL );
\r
444 /* Increment the subscription's reference count. */
\r
445 ( pSubscription->references )++;
\r
447 /* Copy the necessary members of the subscription before releasing the
\r
448 * subscription list mutex. */
\r
449 pCallbackContext = pSubscription->callback.pCallbackContext;
\r
450 callbackFunction = pSubscription->callback.function;
\r
452 /* Unlock the subscription list mutex. */
\r
453 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
455 /* Set the members of the callback parameter. */
\r
456 pCallbackParam->mqttConnection = pMqttConnection;
\r
457 pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;
\r
458 pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;
\r
460 /* Invoke the subscription callback. */
\r
461 callbackFunction( pCallbackContext, pCallbackParam );
\r
463 /* Lock the subscription list mutex to decrement the reference count. */
\r
464 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
466 /* Decrement the reference count. It must still be positive. */
\r
467 ( pSubscription->references )--;
\r
468 IotMqtt_Assert( pSubscription->references >= 0 );
\r
470 /* Save the pointer to the next link in case this subscription is freed. */
\r
471 pNextLink = pCurrentLink->pNext;
\r
473 /* Remove this subscription if it has no references and the unsubscribed
\r
475 if( pSubscription->unsubscribed == true )
\r
477 /* An unsubscribed subscription should have been removed from the list. */
\r
478 IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );
\r
480 /* Free subscriptions with no references. */
\r
481 if( pSubscription->references == 0 )
\r
483 IotMqtt_FreeSubscription( pSubscription );
\r
495 /* Move current link pointer. */
\r
496 pCurrentLink = pNextLink;
\r
499 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
501 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
504 /*-----------------------------------------------------------*/
\r
506 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
\r
507 uint16_t packetIdentifier,
\r
510 _packetMatchParams_t packetMatchParams = { 0 };
\r
512 packetMatchParams.packetIdentifier = packetIdentifier;
\r
513 packetMatchParams.order = order;
\r
515 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
516 IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
\r
518 ( void * ) ( &packetMatchParams ),
\r
519 IotMqtt_FreeSubscription,
\r
520 offsetof( _mqttSubscription_t, link ) );
\r
521 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
524 /*-----------------------------------------------------------*/
\r
526 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
\r
527 const IotMqttSubscription_t * pSubscriptionList,
\r
528 size_t subscriptionCount )
\r
531 _mqttSubscription_t * pSubscription = NULL;
\r
532 IotLink_t * pSubscriptionLink = NULL;
\r
533 _topicMatchParams_t topicMatchParams = { 0 };
\r
535 /* Prevent any other thread from modifying the subscription list while this
\r
536 * function is running. */
\r
537 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
539 /* Find and remove each topic filter from the list. */
\r
540 for( i = 0; i < subscriptionCount; i++ )
\r
542 topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
\r
543 topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
\r
544 topicMatchParams.exactMatchOnly = true;
\r
546 pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
\r
549 &topicMatchParams );
\r
551 if( pSubscriptionLink != NULL )
\r
553 pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
555 /* Reference count must not be negative. */
\r
556 IotMqtt_Assert( pSubscription->references >= 0 );
\r
558 /* Remove subscription from list. */
\r
559 IotListDouble_Remove( pSubscriptionLink );
\r
561 /* Check the reference count. This subscription cannot be removed if
\r
562 * there are subscription callbacks using it. */
\r
563 if( pSubscription->references > 0 )
\r
565 /* Set the unsubscribed flag. The last active subscription callback
\r
566 * will remove and clean up this subscription. */
\r
567 pSubscription->unsubscribed = true;
\r
571 /* Free a subscription with no references. */
\r
572 IotMqtt_FreeSubscription( pSubscription );
\r
581 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
584 /*-----------------------------------------------------------*/
\r
586 bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
\r
587 const char * pTopicFilter,
\r
588 uint16_t topicFilterLength,
\r
589 IotMqttSubscription_t * pCurrentSubscription )
\r
591 bool status = false;
\r
592 _mqttSubscription_t * pSubscription = NULL;
\r
593 IotLink_t * pSubscriptionLink = NULL;
\r
594 _topicMatchParams_t topicMatchParams = { 0 };
\r
596 topicMatchParams.pTopicName = pTopicFilter;
\r
597 topicMatchParams.topicNameLength = topicFilterLength;
\r
598 topicMatchParams.exactMatchOnly = false;
\r
600 /* Prevent any other thread from modifying the subscription list while this
\r
601 * function is running. */
\r
602 IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );
\r
604 /* Search for a matching subscription. */
\r
605 pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),
\r
608 &topicMatchParams );
\r
610 /* Check if a matching subscription was found. */
\r
611 if( pSubscriptionLink != NULL )
\r
613 pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
\r
615 /* Copy the matching subscription to the output parameter. */
\r
616 if( pCurrentSubscription != NULL )
\r
618 pCurrentSubscription->pTopicFilter = pTopicFilter;
\r
619 pCurrentSubscription->topicFilterLength = topicFilterLength;
\r
620 pCurrentSubscription->qos = IOT_MQTT_QOS_0;
\r
621 pCurrentSubscription->callback = pSubscription->callback;
\r
635 IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );
\r
640 /*-----------------------------------------------------------*/
\r
642 /* Provide access to internal functions and variables if testing. */
\r
643 #if IOT_BUILD_TESTS == 1
\r
644 #include "iot_test_access_mqtt_subscription.c"
\r