2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_api.c
\r
28 * @brief Implements most user-facing functions of the MQTT library.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
37 /* Error handling include. */
\r
38 #include "private/iot_error.h"
\r
40 /* MQTT internal include. */
\r
41 #include "private/iot_mqtt_internal.h"
\r
43 /* Platform layer includes. */
\r
44 #include "platform/iot_clock.h"
\r
45 #include "platform/iot_threads.h"
\r
47 /* Validate MQTT configuration settings. */
\r
48 #if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1
\r
49 #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."
\r
51 #if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1
\r
52 #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."
\r
54 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1
\r
55 #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."
\r
57 #if IOT_MQTT_RESPONSE_WAIT_MS <= 0
\r
58 #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."
\r
60 #if IOT_MQTT_RETRY_MS_CEILING <= 0
\r
61 #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."
\r
64 /*-----------------------------------------------------------*/
\r
67 * @brief Set the unsubscribed flag of an MQTT subscription.
\r
69 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
70 * @param[in] pMatch Not used.
\r
72 * @return Always returns `true`.
\r
74 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,
\r
78 * @brief Destroy an MQTT subscription if its reference count is 0.
\r
80 * @param[in] pData The subscription to destroy. This parameter is of type
\r
81 * `void*` for compatibility with [free]
\r
82 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
84 static void _mqttSubscription_tryDestroy( void * pData );
\r
87 * @brief Decrement the reference count of an MQTT operation and attempt to
\r
90 * @param[in] pData The operation data to destroy. This parameter is of type
\r
91 * `void*` for compatibility with [free]
\r
92 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
94 static void _mqttOperation_tryDestroy( void * pData );
\r
97 * @brief Initialize the keep-alive operation for an MQTT connection.
\r
99 * @param[in] pNetworkInfo User-provided network information for the new
\r
101 * @param[in] keepAliveSeconds User-provided keep-alive interval.
\r
102 * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.
\r
104 * @return `true` if the keep-alive job was successfully created; `false` otherwise.
\r
106 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
107 uint16_t keepAliveSeconds,
\r
108 _mqttConnection_t * pMqttConnection );
\r
111 * @brief Creates a new MQTT connection and initializes its members.
\r
113 * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.
\r
114 * @param[in] pNetworkInfo User-provided network information for the new
\r
116 * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.
\r
118 * @return Pointer to a newly-created MQTT connection; `NULL` on failure.
\r
120 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
\r
121 const IotMqttNetworkInfo_t * pNetworkInfo,
\r
122 uint16_t keepAliveSeconds );
\r
125 * @brief Destroys the members of an MQTT connection.
\r
127 * @param[in] pMqttConnection Which connection to destroy.
\r
129 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );
\r
132 * @brief The common component of both @ref mqtt_function_subscribe and @ref
\r
133 * mqtt_function_unsubscribe.
\r
135 * See @ref mqtt_function_subscribe or @ref mqtt_function_unsubscribe for a
\r
136 * description of the parameters and return values.
\r
138 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
\r
139 IotMqttConnection_t mqttConnection,
\r
140 const IotMqttSubscription_t * pSubscriptionList,
\r
141 size_t subscriptionCount,
\r
143 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
144 IotMqttOperation_t * const pOperationReference );
\r
146 /*-----------------------------------------------------------*/
\r
148 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,
\r
151 /* Because this function is called from a container function, the given link
\r
152 * must never be NULL. */
\r
153 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
155 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
159 /* Silence warnings about unused parameters. */
\r
162 /* Set the unsubscribed flag. */
\r
163 pSubscription->unsubscribed = true;
\r
168 /*-----------------------------------------------------------*/
\r
170 static void _mqttSubscription_tryDestroy( void * pData )
\r
172 _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;
\r
174 /* Reference count must not be negative. */
\r
175 IotMqtt_Assert( pSubscription->references >= 0 );
\r
177 /* Unsubscribed flag should be set. */
\r
178 IotMqtt_Assert( pSubscription->unsubscribed == true );
\r
180 /* Free the subscription if it has no references. */
\r
181 if( pSubscription->references == 0 )
\r
183 IotMqtt_FreeSubscription( pSubscription );
\r
191 /*-----------------------------------------------------------*/
\r
193 static void _mqttOperation_tryDestroy( void * pData )
\r
195 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;
\r
196 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
198 /* Incoming PUBLISH operations may always be freed. */
\r
199 if( pOperation->incomingPublish == true )
\r
201 /* Cancel the incoming PUBLISH operation's job. */
\r
202 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
206 /* If the operation's job was not canceled, it must be already executing.
\r
207 * Any other return value is invalid. */
\r
208 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
209 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
211 /* Check if the incoming PUBLISH job was canceled. */
\r
212 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
214 /* Job was canceled. Process incoming PUBLISH now to clean up. */
\r
215 _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,
\r
221 /* The executing job will process the PUBLISH, so nothing is done here. */
\r
227 /* Decrement reference count and destroy operation if possible. */
\r
228 if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )
\r
230 _IotMqtt_DestroyOperation( pOperation );
\r
239 /*-----------------------------------------------------------*/
\r
241 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
242 uint16_t keepAliveSeconds,
\r
243 _mqttConnection_t * pMqttConnection )
\r
245 bool status = true;
\r
246 IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
\r
247 IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;
\r
249 /* Network information is not used when MQTT packet serializers are disabled. */
\r
250 ( void ) pNetworkInfo;
\r
252 /* Default PINGREQ serializer function. */
\r
253 IotMqttError_t ( * serializePingreq )( uint8_t **,
\r
254 size_t * ) = _IotMqtt_SerializePingreq;
\r
256 /* Set PINGREQ operation members. */
\r
257 pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;
\r
259 /* Convert the keep-alive interval to milliseconds. */
\r
260 pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;
\r
261 pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;
\r
263 /* Choose a PINGREQ serializer function. */
\r
264 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
265 if( pNetworkInfo->pMqttSerializer != NULL )
\r
267 if( pNetworkInfo->pMqttSerializer->serialize.pingreq != NULL )
\r
269 serializePingreq = pNetworkInfo->pMqttSerializer->serialize.pingreq;
\r
280 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
282 /* Generate a PINGREQ packet. */
\r
283 serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),
\r
284 &( pMqttConnection->pingreq.u.operation.packetSize ) );
\r
286 if( serializeStatus != IOT_MQTT_SUCCESS )
\r
288 IotLogError( "Failed to allocate PINGREQ packet for new connection." );
\r
294 /* Create the task pool job that processes keep-alive. */
\r
295 jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
\r
297 &( pMqttConnection->pingreq.jobStorage ),
\r
298 &( pMqttConnection->pingreq.job ) );
\r
300 /* Task pool job creation for a pre-allocated job should never fail.
\r
301 * Abort the program if it does. */
\r
302 if( jobStatus != IOT_TASKPOOL_SUCCESS )
\r
304 IotLogError( "Failed to create keep-alive job for new connection." );
\r
306 IotMqtt_Assert( false );
\r
313 /* Keep-alive references its MQTT connection, so increment reference. */
\r
314 ( pMqttConnection->references )++;
\r
320 /*-----------------------------------------------------------*/
\r
322 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
\r
323 const IotMqttNetworkInfo_t * pNetworkInfo,
\r
324 uint16_t keepAliveSeconds )
\r
326 IOT_FUNCTION_ENTRY( bool, true );
\r
327 _mqttConnection_t * pMqttConnection = NULL;
\r
328 bool referencesMutexCreated = false, subscriptionMutexCreated = false;
\r
330 /* Allocate memory for the new MQTT connection. */
\r
331 pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );
\r
333 if( pMqttConnection == NULL )
\r
335 IotLogError( "Failed to allocate memory for new connection." );
\r
337 IOT_SET_AND_GOTO_CLEANUP( false );
\r
341 /* Clear the MQTT connection, then copy the MQTT server mode, network
\r
342 * interface, and disconnect callback. */
\r
343 ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );
\r
344 pMqttConnection->awsIotMqttMode = awsIotMqttMode;
\r
345 pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;
\r
346 pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;
\r
348 /* Start a new MQTT connection with a reference count of 1. */
\r
349 pMqttConnection->references = 1;
\r
352 /* Create the references mutex for a new connection. It is a recursive mutex. */
\r
353 referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );
\r
355 if( referencesMutexCreated == false )
\r
357 IotLogError( "Failed to create references mutex for new connection." );
\r
359 IOT_SET_AND_GOTO_CLEANUP( false );
\r
366 /* Create the subscription mutex for a new connection. */
\r
367 subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );
\r
369 if( subscriptionMutexCreated == false )
\r
371 IotLogError( "Failed to create subscription mutex for new connection." );
\r
373 IOT_SET_AND_GOTO_CLEANUP( false );
\r
380 /* Create the new connection's subscription and operation lists. */
\r
381 IotListDouble_Create( &( pMqttConnection->subscriptionList ) );
\r
382 IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );
\r
383 IotListDouble_Create( &( pMqttConnection->pendingResponse ) );
\r
385 /* AWS IoT service limits set minimum and maximum values for keep-alive interval.
\r
386 * Adjust the user-provided keep-alive interval based on these requirements. */
\r
387 if( awsIotMqttMode == true )
\r
389 if( keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )
\r
391 keepAliveSeconds = AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE;
\r
393 else if( keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )
\r
395 keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;
\r
397 else if( keepAliveSeconds == 0 )
\r
399 keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;
\r
411 /* Check if keep-alive is active for this connection. */
\r
412 if( keepAliveSeconds != 0 )
\r
414 if( _createKeepAliveOperation( pNetworkInfo,
\r
416 pMqttConnection ) == false )
\r
418 IOT_SET_AND_GOTO_CLEANUP( false );
\r
430 /* Clean up mutexes and connection if this function failed. */
\r
431 IOT_FUNCTION_CLEANUP_BEGIN();
\r
433 if( status == false )
\r
435 if( subscriptionMutexCreated == true )
\r
437 IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );
\r
444 if( referencesMutexCreated == true )
\r
446 IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );
\r
453 if( pMqttConnection != NULL )
\r
455 IotMqtt_FreeConnection( pMqttConnection );
\r
456 pMqttConnection = NULL;
\r
468 return pMqttConnection;
\r
471 /*-----------------------------------------------------------*/
\r
473 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
\r
475 IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
\r
477 /* Default free packet function. */
\r
478 void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;
\r
480 /* Clean up keep-alive if still allocated. */
\r
481 if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
\r
483 IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );
\r
485 /* Choose a function to free the packet. */
\r
486 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
487 if( pMqttConnection->pSerializer != NULL )
\r
489 if( pMqttConnection->pSerializer->freePacket != NULL )
\r
491 freePacket = pMqttConnection->pSerializer->freePacket;
\r
496 freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
\r
498 /* Clear data about the keep-alive. */
\r
499 pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
\r
500 pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
\r
501 pMqttConnection->pingreq.u.operation.packetSize = 0;
\r
503 /* Decrement reference count. */
\r
504 pMqttConnection->references--;
\r
511 /* A connection to be destroyed should have no keep-alive and at most 1
\r
513 IotMqtt_Assert( pMqttConnection->references <= 1 );
\r
514 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );
\r
515 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );
\r
516 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );
\r
518 /* Remove all subscriptions. */
\r
519 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
520 IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
\r
521 _mqttSubscription_setUnsubscribe,
\r
523 _mqttSubscription_tryDestroy,
\r
524 offsetof( _mqttSubscription_t, link ) );
\r
525 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
527 /* Destroy an owned network connection. */
\r
528 if( pMqttConnection->ownNetworkConnection == true )
\r
530 networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );
\r
532 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
534 IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",
\r
539 IotLogInfo( "(MQTT connection %p) Network connection destroyed.",
\r
548 /* Destroy mutexes. */
\r
549 IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );
\r
550 IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );
\r
552 IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );
\r
554 /* Free connection. */
\r
555 IotMqtt_FreeConnection( pMqttConnection );
\r
558 /*-----------------------------------------------------------*/
\r
560 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
\r
561 IotMqttConnection_t mqttConnection,
\r
562 const IotMqttSubscription_t * pSubscriptionList,
\r
563 size_t subscriptionCount,
\r
565 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
566 IotMqttOperation_t * const pOperationReference )
\r
568 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
569 _mqttOperation_t * pSubscriptionOperation = NULL;
\r
571 /* Subscription serializer function. */
\r
572 IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,
\r
576 uint16_t * ) = NULL;
\r
578 /* This function should only be called for subscribe or unsubscribe. */
\r
579 IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
\r
580 ( operation == IOT_MQTT_UNSUBSCRIBE ) );
\r
582 /* Check that all elements in the subscription list are valid. */
\r
583 if( _IotMqtt_ValidateSubscriptionList( operation,
\r
584 mqttConnection->awsIotMqttMode,
\r
586 subscriptionCount ) == false )
\r
588 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
595 /* Check that a reference pointer is provided for a waitable operation. */
\r
596 if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
598 if( pOperationReference == NULL )
\r
600 IotLogError( "Reference must be provided for a waitable %s.",
\r
601 IotMqtt_OperationType( operation ) );
\r
603 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
615 /* Choose a subscription serialize function. */
\r
616 if( operation == IOT_MQTT_SUBSCRIBE )
\r
618 serializeSubscription = _IotMqtt_SerializeSubscribe;
\r
620 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
621 if( mqttConnection->pSerializer != NULL )
\r
623 if( mqttConnection->pSerializer->serialize.subscribe != NULL )
\r
625 serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;
\r
636 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
640 serializeSubscription = _IotMqtt_SerializeUnsubscribe;
\r
642 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
643 if( mqttConnection->pSerializer != NULL )
\r
645 if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )
\r
647 serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;
\r
658 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
661 /* Remove the MQTT subscription list for an UNSUBSCRIBE. */
\r
662 if( operation == IOT_MQTT_UNSUBSCRIBE )
\r
664 _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,
\r
666 subscriptionCount );
\r
673 /* Create a subscription operation. */
\r
674 status = _IotMqtt_CreateOperation( mqttConnection,
\r
677 &pSubscriptionOperation );
\r
679 if( status != IOT_MQTT_SUCCESS )
\r
681 IOT_GOTO_CLEANUP();
\r
684 /* Check the subscription operation data and set the operation type. */
\r
685 IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
686 IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );
\r
687 pSubscriptionOperation->u.operation.type = operation;
\r
689 /* Generate a subscription packet from the subscription list. */
\r
690 status = serializeSubscription( pSubscriptionList,
\r
692 &( pSubscriptionOperation->u.operation.pMqttPacket ),
\r
693 &( pSubscriptionOperation->u.operation.packetSize ),
\r
694 &( pSubscriptionOperation->u.operation.packetIdentifier ) );
\r
696 if( status != IOT_MQTT_SUCCESS )
\r
698 IOT_GOTO_CLEANUP();
\r
701 /* Check the serialized MQTT packet. */
\r
702 IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );
\r
703 IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );
\r
705 /* Add the subscription list for a SUBSCRIBE. */
\r
706 if( operation == IOT_MQTT_SUBSCRIBE )
\r
708 status = _IotMqtt_AddSubscriptions( mqttConnection,
\r
709 pSubscriptionOperation->u.operation.packetIdentifier,
\r
711 subscriptionCount );
\r
713 if( status != IOT_MQTT_SUCCESS )
\r
715 IOT_GOTO_CLEANUP();
\r
719 /* Set the reference, if provided. */
\r
720 if( pOperationReference != NULL )
\r
722 *pOperationReference = pSubscriptionOperation;
\r
725 /* Schedule the subscription operation for network transmission. */
\r
726 status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,
\r
727 _IotMqtt_ProcessSend,
\r
730 if( status != IOT_MQTT_SUCCESS )
\r
732 IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",
\r
734 IotMqtt_OperationType( operation ) );
\r
736 if( operation == IOT_MQTT_SUBSCRIBE )
\r
738 _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,
\r
739 pSubscriptionOperation->u.operation.packetIdentifier,
\r
743 /* Clear the previously set (and now invalid) reference. */
\r
744 if( pOperationReference != NULL )
\r
746 *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;
\r
749 IOT_GOTO_CLEANUP();
\r
752 /* Clean up if this function failed. */
\r
753 IOT_FUNCTION_CLEANUP_BEGIN();
\r
755 if( status != IOT_MQTT_SUCCESS )
\r
757 if( pSubscriptionOperation != NULL )
\r
759 _IotMqtt_DestroyOperation( pSubscriptionOperation );
\r
764 status = IOT_MQTT_STATUS_PENDING;
\r
766 IotLogInfo( "(MQTT connection %p) %s operation scheduled.",
\r
768 IotMqtt_OperationType( operation ) );
\r
771 IOT_FUNCTION_CLEANUP_END();
\r
774 /*-----------------------------------------------------------*/
\r
776 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )
\r
778 bool disconnected = false;
\r
780 /* Lock the mutex protecting the reference count. */
\r
781 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
783 /* Reference count must not be negative. */
\r
784 IotMqtt_Assert( pMqttConnection->references >= 0 );
\r
786 /* Read connection status. */
\r
787 disconnected = pMqttConnection->disconnected;
\r
789 /* Increment the connection's reference count if it is not disconnected. */
\r
790 if( disconnected == false )
\r
792 ( pMqttConnection->references )++;
\r
793 IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",
\r
795 ( long int ) pMqttConnection->references - 1,
\r
796 ( long int ) pMqttConnection->references );
\r
800 IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );
\r
803 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
805 return( disconnected == false );
\r
808 /*-----------------------------------------------------------*/
\r
810 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )
\r
812 bool destroyConnection = false;
\r
814 /* Lock the mutex protecting the reference count. */
\r
815 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
817 /* Decrement reference count. It must not be negative. */
\r
818 ( pMqttConnection->references )--;
\r
819 IotMqtt_Assert( pMqttConnection->references >= 0 );
\r
821 IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",
\r
823 ( long int ) pMqttConnection->references + 1,
\r
824 ( long int ) pMqttConnection->references );
\r
826 /* Check if this connection may be destroyed. */
\r
827 if( pMqttConnection->references == 0 )
\r
829 destroyConnection = true;
\r
836 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
838 /* Destroy an unreferenced MQTT connection. */
\r
839 if( destroyConnection == true )
\r
841 IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",
\r
843 _destroyMqttConnection( pMqttConnection );
\r
851 /*-----------------------------------------------------------*/
\r
853 IotMqttError_t IotMqtt_Init( void )
\r
855 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
857 /* Call any additional serializer initialization function if serializer
\r
858 * overrides are enabled. */
\r
859 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
860 #ifdef _IotMqtt_InitSerializeAdditional
\r
861 if( _IotMqtt_InitSerializeAdditional() == false )
\r
863 status = IOT_MQTT_INIT_FAILED;
\r
870 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
872 /* Log initialization status. */
\r
873 if( status != IOT_MQTT_SUCCESS )
\r
875 IotLogError( "Failed to initialize MQTT library serializer. " );
\r
879 IotLogInfo( "MQTT library successfully initialized." );
\r
885 /*-----------------------------------------------------------*/
\r
887 void IotMqtt_Cleanup( void )
\r
889 /* Call any additional serializer cleanup initialization function if serializer
\r
890 * overrides are enabled. */
\r
891 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
892 #ifdef _IotMqtt_CleanupSerializeAdditional
\r
893 _IotMqtt_CleanupSerializeAdditional();
\r
897 IotLogInfo( "MQTT library cleanup done." );
\r
900 /*-----------------------------------------------------------*/
\r
902 IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
903 const IotMqttConnectInfo_t * pConnectInfo,
\r
904 uint32_t timeoutMs,
\r
905 IotMqttConnection_t * const pMqttConnection )
\r
907 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
908 bool networkCreated = false, ownNetworkConnection = false;
\r
909 IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
\r
910 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
911 void * pNetworkConnection = NULL;
\r
912 _mqttOperation_t * pOperation = NULL;
\r
913 _mqttConnection_t * pNewMqttConnection = NULL;
\r
915 /* Default CONNECT serializer function. */
\r
916 IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,
\r
918 size_t * ) = _IotMqtt_SerializeConnect;
\r
920 /* Network info must not be NULL. */
\r
921 if( pNetworkInfo == NULL )
\r
923 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
930 /* Validate network interface and connect info. */
\r
931 if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
\r
933 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
940 /* If will info is provided, check that it is valid. */
\r
941 if( pConnectInfo->pWillInfo != NULL )
\r
943 if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,
\r
944 pConnectInfo->pWillInfo ) == false )
\r
946 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
948 else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )
\r
950 /* Will message payloads cannot be larger than 65535. This restriction
\r
951 * applies only to will messages, and not normal PUBLISH messages. */
\r
952 IotLogError( "Will payload cannot be larger than 65535." );
\r
954 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
966 /* If previous subscriptions are provided, check that they are valid. */
\r
967 if( pConnectInfo->cleanSession == false )
\r
969 if( pConnectInfo->pPreviousSubscriptions != NULL )
\r
971 if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,
\r
972 pConnectInfo->awsIotMqttMode,
\r
973 pConnectInfo->pPreviousSubscriptions,
\r
974 pConnectInfo->previousSubscriptionCount ) == false )
\r
976 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
993 /* Create a new MQTT connection if requested. Otherwise, copy the existing
\r
994 * network connection. */
\r
995 if( pNetworkInfo->createNetworkConnection == true )
\r
997 networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,
\r
998 pNetworkInfo->u.setup.pNetworkCredentialInfo,
\r
999 &pNetworkConnection );
\r
1001 if( networkStatus == IOT_NETWORK_SUCCESS )
\r
1003 networkCreated = true;
\r
1005 /* This MQTT connection owns the network connection it created and
\r
1006 * should destroy it on cleanup. */
\r
1007 ownNetworkConnection = true;
\r
1011 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
1016 pNetworkConnection = pNetworkInfo->u.pNetworkConnection;
\r
1017 networkCreated = true;
\r
1020 IotLogInfo( "Establishing new MQTT connection." );
\r
1022 /* Initialize a new MQTT connection object. */
\r
1023 pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
\r
1025 pConnectInfo->keepAliveSeconds );
\r
1027 if( pNewMqttConnection == NULL )
\r
1029 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
1033 /* Set the network connection associated with the MQTT connection. */
\r
1034 pNewMqttConnection->pNetworkConnection = pNetworkConnection;
\r
1035 pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;
\r
1037 /* Set the MQTT packet serializer overrides. */
\r
1038 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1039 pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;
\r
1043 /* Set the MQTT receive callback. */
\r
1044 networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,
\r
1045 IotMqtt_ReceiveCallback,
\r
1046 pNewMqttConnection );
\r
1048 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1050 IotLogError( "Failed to set MQTT network receive callback." );
\r
1052 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
1056 EMPTY_ELSE_MARKER;
\r
1059 /* Create a CONNECT operation. */
\r
1060 status = _IotMqtt_CreateOperation( pNewMqttConnection,
\r
1061 IOT_MQTT_FLAG_WAITABLE,
\r
1065 if( status != IOT_MQTT_SUCCESS )
\r
1067 IOT_GOTO_CLEANUP();
\r
1071 EMPTY_ELSE_MARKER;
\r
1074 /* Ensure the members set by operation creation and serialization
\r
1075 * are appropriate for a blocking CONNECT. */
\r
1076 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1077 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
\r
1078 == IOT_MQTT_FLAG_WAITABLE );
\r
1079 IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
\r
1081 /* Set the operation type. */
\r
1082 pOperation->u.operation.type = IOT_MQTT_CONNECT;
\r
1084 /* Add previous session subscriptions. */
\r
1085 if( pConnectInfo->pPreviousSubscriptions != NULL )
\r
1087 /* Previous subscription count should have been validated as nonzero. */
\r
1088 IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );
\r
1090 status = _IotMqtt_AddSubscriptions( pNewMqttConnection,
\r
1092 pConnectInfo->pPreviousSubscriptions,
\r
1093 pConnectInfo->previousSubscriptionCount );
\r
1095 if( status != IOT_MQTT_SUCCESS )
\r
1097 IOT_GOTO_CLEANUP();
\r
1101 EMPTY_ELSE_MARKER;
\r
1106 EMPTY_ELSE_MARKER;
\r
1109 /* Choose a CONNECT serializer function. */
\r
1110 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1111 if( pNewMqttConnection->pSerializer != NULL )
\r
1113 if( pNewMqttConnection->pSerializer->serialize.connect != NULL )
\r
1115 serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;
\r
1119 EMPTY_ELSE_MARKER;
\r
1124 EMPTY_ELSE_MARKER;
\r
1126 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1128 /* Convert the connect info and will info objects to an MQTT CONNECT packet. */
\r
1129 status = serializeConnect( pConnectInfo,
\r
1130 &( pOperation->u.operation.pMqttPacket ),
\r
1131 &( pOperation->u.operation.packetSize ) );
\r
1133 if( status != IOT_MQTT_SUCCESS )
\r
1135 IOT_GOTO_CLEANUP();
\r
1139 EMPTY_ELSE_MARKER;
\r
1142 /* Check the serialized MQTT packet. */
\r
1143 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1144 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1146 /* Add the CONNECT operation to the send queue for network transmission. */
\r
1147 status = _IotMqtt_ScheduleOperation( pOperation,
\r
1148 _IotMqtt_ProcessSend,
\r
1151 if( status != IOT_MQTT_SUCCESS )
\r
1153 IotLogError( "Failed to enqueue CONNECT for sending." );
\r
1157 /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */
\r
1158 status = IotMqtt_Wait( pOperation,
\r
1161 /* The call to wait cleans up the CONNECT operation, so set the pointer
\r
1163 pOperation = NULL;
\r
1166 /* When a connection is successfully established, schedule keep-alive job. */
\r
1167 if( status == IOT_MQTT_SUCCESS )
\r
1169 /* Check if a keep-alive job should be scheduled. */
\r
1170 if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
\r
1172 IotLogDebug( "Scheduling first MQTT keep-alive job." );
\r
1174 taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
\r
1175 pNewMqttConnection->pingreq.job,
\r
1176 pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );
\r
1178 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1180 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );
\r
1184 EMPTY_ELSE_MARKER;
\r
1189 EMPTY_ELSE_MARKER;
\r
1194 EMPTY_ELSE_MARKER;
\r
1197 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1199 if( status != IOT_MQTT_SUCCESS )
\r
1201 IotLogError( "Failed to establish new MQTT connection, error %s.",
\r
1202 IotMqtt_strerror( status ) );
\r
1204 /* The network connection must be closed if it was created. */
\r
1205 if( networkCreated == true )
\r
1207 networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );
\r
1209 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1211 IotLogWarn( "Failed to close network connection." );
\r
1215 IotLogInfo( "Network connection closed on error." );
\r
1220 EMPTY_ELSE_MARKER;
\r
1223 if( pOperation != NULL )
\r
1225 _IotMqtt_DestroyOperation( pOperation );
\r
1229 EMPTY_ELSE_MARKER;
\r
1232 if( pNewMqttConnection != NULL )
\r
1234 _destroyMqttConnection( pNewMqttConnection );
\r
1238 EMPTY_ELSE_MARKER;
\r
1243 IotLogInfo( "New MQTT connection %p established.", pMqttConnection );
\r
1245 /* Set the output parameter. */
\r
1246 *pMqttConnection = pNewMqttConnection;
\r
1249 IOT_FUNCTION_CLEANUP_END();
\r
1252 /*-----------------------------------------------------------*/
\r
1254 void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
\r
1257 bool disconnected = false;
\r
1258 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1259 _mqttOperation_t * pOperation = NULL;
\r
1261 IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );
\r
1263 /* Read the connection status. */
\r
1264 IotMutex_Lock( &( mqttConnection->referencesMutex ) );
\r
1265 disconnected = mqttConnection->disconnected;
\r
1266 IotMutex_Unlock( &( mqttConnection->referencesMutex ) );
\r
1268 /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"
\r
1269 * flag is not set. */
\r
1270 if( disconnected == false )
\r
1272 if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == 0 )
\r
1274 /* Create a DISCONNECT operation. This function blocks until the DISCONNECT
\r
1275 * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */
\r
1276 status = _IotMqtt_CreateOperation( mqttConnection,
\r
1277 IOT_MQTT_FLAG_WAITABLE,
\r
1281 if( status == IOT_MQTT_SUCCESS )
\r
1283 /* Ensure that the members set by operation creation and serialization
\r
1284 * are appropriate for a blocking DISCONNECT. */
\r
1285 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1286 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
\r
1287 == IOT_MQTT_FLAG_WAITABLE );
\r
1288 IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
\r
1290 /* Set the operation type. */
\r
1291 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;
\r
1293 /* Choose a disconnect serializer. */
\r
1294 IotMqttError_t ( * serializeDisconnect )( uint8_t **,
\r
1295 size_t * ) = _IotMqtt_SerializeDisconnect;
\r
1297 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1298 if( mqttConnection->pSerializer != NULL )
\r
1300 if( mqttConnection->pSerializer->serialize.disconnect != NULL )
\r
1302 serializeDisconnect = mqttConnection->pSerializer->serialize.disconnect;
\r
1306 EMPTY_ELSE_MARKER;
\r
1311 EMPTY_ELSE_MARKER;
\r
1313 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1315 /* Generate a DISCONNECT packet. */
\r
1316 status = serializeDisconnect( &( pOperation->u.operation.pMqttPacket ),
\r
1317 &( pOperation->u.operation.packetSize ) );
\r
1321 EMPTY_ELSE_MARKER;
\r
1324 if( status == IOT_MQTT_SUCCESS )
\r
1326 /* Check the serialized MQTT packet. */
\r
1327 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1328 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1330 /* Schedule the DISCONNECT operation for network transmission. */
\r
1331 if( _IotMqtt_ScheduleOperation( pOperation,
\r
1332 _IotMqtt_ProcessSend,
\r
1333 0 ) != IOT_MQTT_SUCCESS )
\r
1335 IotLogWarn( "(MQTT connection %p) Failed to schedule DISCONNECT for sending.",
\r
1337 _IotMqtt_DestroyOperation( pOperation );
\r
1341 /* Wait a short time for the DISCONNECT packet to be transmitted. */
\r
1342 status = IotMqtt_Wait( pOperation,
\r
1343 IOT_MQTT_RESPONSE_WAIT_MS );
\r
1345 /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,
\r
1346 * or NETWORK ERROR. */
\r
1347 if( status == IOT_MQTT_SUCCESS )
\r
1349 IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );
\r
1353 IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||
\r
1354 ( status == IOT_MQTT_NETWORK_ERROR ) );
\r
1356 IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",
\r
1358 IotMqtt_strerror( status ) );
\r
1364 EMPTY_ELSE_MARKER;
\r
1369 EMPTY_ELSE_MARKER;
\r
1374 EMPTY_ELSE_MARKER;
\r
1377 /* Close the underlying network connection. This also cleans up keep-alive. */
\r
1378 _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,
\r
1381 /* Check if the connection may be destroyed. */
\r
1382 IotMutex_Lock( &( mqttConnection->referencesMutex ) );
\r
1384 /* At this point, the connection should be marked disconnected. */
\r
1385 IotMqtt_Assert( mqttConnection->disconnected == true );
\r
1387 /* Attempt cancel and destroy each operation in the connection's lists. */
\r
1388 IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),
\r
1389 _mqttOperation_tryDestroy,
\r
1390 offsetof( _mqttOperation_t, link ) );
\r
1392 IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),
\r
1393 _mqttOperation_tryDestroy,
\r
1394 offsetof( _mqttOperation_t, link ) );
\r
1396 IotMutex_Unlock( &( mqttConnection->referencesMutex ) );
\r
1398 /* Decrement the connection reference count and destroy it if possible. */
\r
1399 _IotMqtt_DecrementConnectionReferences( mqttConnection );
\r
1402 /*-----------------------------------------------------------*/
\r
1404 IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
\r
1405 const IotMqttSubscription_t * pSubscriptionList,
\r
1406 size_t subscriptionCount,
\r
1408 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1409 IotMqttOperation_t * const pSubscribeOperation )
\r
1411 return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,
\r
1413 pSubscriptionList,
\r
1414 subscriptionCount,
\r
1417 pSubscribeOperation );
\r
1420 /*-----------------------------------------------------------*/
\r
1422 IotMqttError_t IotMqtt_TimedSubscribe( IotMqttConnection_t mqttConnection,
\r
1423 const IotMqttSubscription_t * pSubscriptionList,
\r
1424 size_t subscriptionCount,
\r
1426 uint32_t timeoutMs )
\r
1428 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1429 IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1431 /* Flags are not used, but the parameter is present for future compatibility. */
\r
1434 /* Call the asynchronous SUBSCRIBE function. */
\r
1435 status = IotMqtt_Subscribe( mqttConnection,
\r
1436 pSubscriptionList,
\r
1437 subscriptionCount,
\r
1438 IOT_MQTT_FLAG_WAITABLE,
\r
1440 &subscribeOperation );
\r
1442 /* Wait for the SUBSCRIBE operation to complete. */
\r
1443 if( status == IOT_MQTT_STATUS_PENDING )
\r
1445 status = IotMqtt_Wait( subscribeOperation, timeoutMs );
\r
1449 EMPTY_ELSE_MARKER;
\r
1452 /* Ensure that a status was set. */
\r
1453 IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );
\r
1458 /*-----------------------------------------------------------*/
\r
1460 IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
\r
1461 const IotMqttSubscription_t * pSubscriptionList,
\r
1462 size_t subscriptionCount,
\r
1464 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1465 IotMqttOperation_t * const pUnsubscribeOperation )
\r
1467 return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,
\r
1469 pSubscriptionList,
\r
1470 subscriptionCount,
\r
1473 pUnsubscribeOperation );
\r
1476 /*-----------------------------------------------------------*/
\r
1478 IotMqttError_t IotMqtt_TimedUnsubscribe( IotMqttConnection_t mqttConnection,
\r
1479 const IotMqttSubscription_t * pSubscriptionList,
\r
1480 size_t subscriptionCount,
\r
1482 uint32_t timeoutMs )
\r
1484 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1485 IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1487 /* Flags are not used, but the parameter is present for future compatibility. */
\r
1490 /* Call the asynchronous UNSUBSCRIBE function. */
\r
1491 status = IotMqtt_Unsubscribe( mqttConnection,
\r
1492 pSubscriptionList,
\r
1493 subscriptionCount,
\r
1494 IOT_MQTT_FLAG_WAITABLE,
\r
1496 &unsubscribeOperation );
\r
1498 /* Wait for the UNSUBSCRIBE operation to complete. */
\r
1499 if( status == IOT_MQTT_STATUS_PENDING )
\r
1501 status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );
\r
1505 EMPTY_ELSE_MARKER;
\r
1508 /* Ensure that a status was set. */
\r
1509 IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );
\r
1514 /*-----------------------------------------------------------*/
\r
1516 IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
\r
1517 const IotMqttPublishInfo_t * pPublishInfo,
\r
1519 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1520 IotMqttOperation_t * const pPublishOperation )
\r
1522 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
1523 _mqttOperation_t * pOperation = NULL;
\r
1524 uint8_t ** pPacketIdentifierHigh = NULL;
\r
1526 /* Default PUBLISH serializer function. */
\r
1527 IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,
\r
1531 uint8_t ** ) = _IotMqtt_SerializePublish;
\r
1533 /* Check that the PUBLISH information is valid. */
\r
1534 if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,
\r
1535 pPublishInfo ) == false )
\r
1537 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1541 EMPTY_ELSE_MARKER;
\r
1544 /* Check that no notification is requested for a QoS 0 publish. */
\r
1545 if( pPublishInfo->qos == IOT_MQTT_QOS_0 )
\r
1547 if( pCallbackInfo != NULL )
\r
1549 IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
\r
1551 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1553 else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )
\r
1555 IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
\r
1557 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1561 EMPTY_ELSE_MARKER;
\r
1564 if( pPublishOperation != NULL )
\r
1566 IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );
\r
1570 EMPTY_ELSE_MARKER;
\r
1575 EMPTY_ELSE_MARKER;
\r
1578 /* Check that a reference pointer is provided for a waitable operation. */
\r
1579 if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
1581 if( pPublishOperation == NULL )
\r
1583 IotLogError( "Reference must be provided for a waitable PUBLISH." );
\r
1585 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1589 EMPTY_ELSE_MARKER;
\r
1594 EMPTY_ELSE_MARKER;
\r
1597 /* Create a PUBLISH operation. */
\r
1598 status = _IotMqtt_CreateOperation( mqttConnection,
\r
1603 if( status != IOT_MQTT_SUCCESS )
\r
1605 IOT_GOTO_CLEANUP();
\r
1609 EMPTY_ELSE_MARKER;
\r
1612 /* Check the PUBLISH operation data and set the operation type. */
\r
1613 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1614 pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;
\r
1616 /* Choose a PUBLISH serializer function. */
\r
1617 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1618 if( mqttConnection->pSerializer != NULL )
\r
1620 if( mqttConnection->pSerializer->serialize.publish != NULL )
\r
1622 serializePublish = mqttConnection->pSerializer->serialize.publish;
\r
1626 EMPTY_ELSE_MARKER;
\r
1631 EMPTY_ELSE_MARKER;
\r
1633 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1635 /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */
\r
1636 if( mqttConnection->awsIotMqttMode == true )
\r
1638 pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );
\r
1642 EMPTY_ELSE_MARKER;
\r
1645 /* Generate a PUBLISH packet from pPublishInfo. */
\r
1646 status = serializePublish( pPublishInfo,
\r
1647 &( pOperation->u.operation.pMqttPacket ),
\r
1648 &( pOperation->u.operation.packetSize ),
\r
1649 &( pOperation->u.operation.packetIdentifier ),
\r
1650 pPacketIdentifierHigh );
\r
1652 if( status != IOT_MQTT_SUCCESS )
\r
1654 IOT_GOTO_CLEANUP();
\r
1658 EMPTY_ELSE_MARKER;
\r
1661 /* Check the serialized MQTT packet. */
\r
1662 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1663 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1665 /* Initialize PUBLISH retry if retryLimit is set. */
\r
1666 if( pPublishInfo->retryLimit > 0 )
\r
1668 /* A QoS 0 PUBLISH may not be retried. */
\r
1669 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1671 pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;
\r
1672 pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;
\r
1676 EMPTY_ELSE_MARKER;
\r
1681 EMPTY_ELSE_MARKER;
\r
1684 /* Set the reference, if provided. */
\r
1685 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1687 if( pPublishOperation != NULL )
\r
1689 *pPublishOperation = pOperation;
\r
1693 EMPTY_ELSE_MARKER;
\r
1698 EMPTY_ELSE_MARKER;
\r
1701 /* Add the PUBLISH operation to the send queue for network transmission. */
\r
1702 status = _IotMqtt_ScheduleOperation( pOperation,
\r
1703 _IotMqtt_ProcessSend,
\r
1706 if( status != IOT_MQTT_SUCCESS )
\r
1708 IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",
\r
1711 /* Clear the previously set (and now invalid) reference. */
\r
1712 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1714 if( pPublishOperation != NULL )
\r
1716 *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1720 EMPTY_ELSE_MARKER;
\r
1725 EMPTY_ELSE_MARKER;
\r
1728 IOT_GOTO_CLEANUP();
\r
1732 EMPTY_ELSE_MARKER;
\r
1735 /* Clean up the PUBLISH operation if this function fails. Otherwise, set the
\r
1736 * appropriate return code based on QoS. */
\r
1737 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1739 if( status != IOT_MQTT_SUCCESS )
\r
1741 if( pOperation != NULL )
\r
1743 _IotMqtt_DestroyOperation( pOperation );
\r
1747 EMPTY_ELSE_MARKER;
\r
1752 if( pPublishInfo->qos > IOT_MQTT_QOS_0 )
\r
1754 status = IOT_MQTT_STATUS_PENDING;
\r
1758 EMPTY_ELSE_MARKER;
\r
1761 IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",
\r
1765 IOT_FUNCTION_CLEANUP_END();
\r
1768 /*-----------------------------------------------------------*/
\r
1770 IotMqttError_t IotMqtt_TimedPublish( IotMqttConnection_t mqttConnection,
\r
1771 const IotMqttPublishInfo_t * pPublishInfo,
\r
1773 uint32_t timeoutMs )
\r
1775 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1776 IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,
\r
1777 * pPublishOperation = NULL;
\r
1779 /* Clear the flags. */
\r
1782 /* Set the waitable flag and reference for QoS 1 PUBLISH. */
\r
1783 if( pPublishInfo->qos == IOT_MQTT_QOS_1 )
\r
1785 flags = IOT_MQTT_FLAG_WAITABLE;
\r
1786 pPublishOperation = &publishOperation;
\r
1790 EMPTY_ELSE_MARKER;
\r
1793 /* Call the asynchronous PUBLISH function. */
\r
1794 status = IotMqtt_Publish( mqttConnection,
\r
1798 pPublishOperation );
\r
1800 /* Wait for a queued QoS 1 PUBLISH to complete. */
\r
1801 if( pPublishInfo->qos == IOT_MQTT_QOS_1 )
\r
1803 if( status == IOT_MQTT_STATUS_PENDING )
\r
1805 status = IotMqtt_Wait( publishOperation, timeoutMs );
\r
1809 EMPTY_ELSE_MARKER;
\r
1814 EMPTY_ELSE_MARKER;
\r
1820 /*-----------------------------------------------------------*/
\r
1822 IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,
\r
1823 uint32_t timeoutMs )
\r
1825 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
1826 _mqttConnection_t * pMqttConnection = operation->pMqttConnection;
\r
1828 /* Validate the given operation reference. */
\r
1829 if( _IotMqtt_ValidateOperation( operation ) == false )
\r
1831 status = IOT_MQTT_BAD_PARAMETER;
\r
1835 EMPTY_ELSE_MARKER;
\r
1838 /* Check the MQTT connection status. */
\r
1839 if( status == IOT_MQTT_SUCCESS )
\r
1841 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1843 if( pMqttConnection->disconnected == true )
\r
1845 IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "
\r
1846 "Operation cannot be waited on.",
\r
1848 IotMqtt_OperationType( operation->u.operation.type ),
\r
1851 status = IOT_MQTT_NETWORK_ERROR;
\r
1855 IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",
\r
1857 IotMqtt_OperationType( operation->u.operation.type ),
\r
1861 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1863 /* Only wait on an operation if the MQTT connection is active. */
\r
1864 if( status == IOT_MQTT_SUCCESS )
\r
1866 if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),
\r
1867 timeoutMs ) == false )
\r
1869 status = IOT_MQTT_TIMEOUT;
\r
1871 /* Attempt to cancel the job of the timed out operation. */
\r
1872 ( void ) _IotMqtt_DecrementOperationReferences( operation, true );
\r
1874 /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */
\r
1875 if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )
\r
1877 IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"
\r
1878 " subscriptions of timed-out SUBSCRIBE.",
\r
1882 _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,
\r
1883 operation->u.operation.packetIdentifier,
\r
1888 EMPTY_ELSE_MARKER;
\r
1893 /* Retrieve the status of the completed operation. */
\r
1894 status = operation->u.operation.status;
\r
1897 IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",
\r
1899 IotMqtt_OperationType( operation->u.operation.type ),
\r
1901 IotMqtt_strerror( status ) );
\r
1905 EMPTY_ELSE_MARKER;
\r
1908 /* Wait is finished; decrement operation reference count. */
\r
1909 if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )
\r
1911 _IotMqtt_DestroyOperation( operation );
\r
1915 EMPTY_ELSE_MARKER;
\r
1920 EMPTY_ELSE_MARKER;
\r
1926 /*-----------------------------------------------------------*/
\r
1928 const char * IotMqtt_strerror( IotMqttError_t status )
\r
1930 const char * pMessage = NULL;
\r
1934 case IOT_MQTT_SUCCESS:
\r
1935 pMessage = "SUCCESS";
\r
1938 case IOT_MQTT_STATUS_PENDING:
\r
1939 pMessage = "PENDING";
\r
1942 case IOT_MQTT_INIT_FAILED:
\r
1943 pMessage = "INITIALIZATION FAILED";
\r
1946 case IOT_MQTT_BAD_PARAMETER:
\r
1947 pMessage = "BAD PARAMETER";
\r
1950 case IOT_MQTT_NO_MEMORY:
\r
1951 pMessage = "NO MEMORY";
\r
1954 case IOT_MQTT_NETWORK_ERROR:
\r
1955 pMessage = "NETWORK ERROR";
\r
1958 case IOT_MQTT_SCHEDULING_ERROR:
\r
1959 pMessage = "SCHEDULING ERROR";
\r
1962 case IOT_MQTT_BAD_RESPONSE:
\r
1963 pMessage = "BAD RESPONSE RECEIVED";
\r
1966 case IOT_MQTT_TIMEOUT:
\r
1967 pMessage = "TIMEOUT";
\r
1970 case IOT_MQTT_SERVER_REFUSED:
\r
1971 pMessage = "SERVER REFUSED";
\r
1974 case IOT_MQTT_RETRY_NO_RESPONSE:
\r
1975 pMessage = "NO RESPONSE";
\r
1979 pMessage = "INVALID STATUS";
\r
1986 /*-----------------------------------------------------------*/
\r
1988 const char * IotMqtt_OperationType( IotMqttOperationType_t operation )
\r
1990 const char * pMessage = NULL;
\r
1992 switch( operation )
\r
1994 case IOT_MQTT_CONNECT:
\r
1995 pMessage = "CONNECT";
\r
1998 case IOT_MQTT_PUBLISH_TO_SERVER:
\r
1999 pMessage = "PUBLISH";
\r
2002 case IOT_MQTT_PUBACK:
\r
2003 pMessage = "PUBACK";
\r
2006 case IOT_MQTT_SUBSCRIBE:
\r
2007 pMessage = "SUBSCRIBE";
\r
2010 case IOT_MQTT_UNSUBSCRIBE:
\r
2011 pMessage = "UNSUBSCRIBE";
\r
2014 case IOT_MQTT_PINGREQ:
\r
2015 pMessage = "PINGREQ";
\r
2018 case IOT_MQTT_DISCONNECT:
\r
2019 pMessage = "DISCONNECT";
\r
2023 pMessage = "INVALID OPERATION";
\r
2030 /*-----------------------------------------------------------*/
\r
2032 /* Provide access to internal functions and variables if testing. */
\r
2033 #if IOT_BUILD_TESTS == 1
\r
2034 #include "iot_test_access_mqtt_api.c"
\r