2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_api.c
\r
28 * @brief Implements most user-facing functions of the MQTT library.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
37 /* Error handling include. */
\r
38 #include "private/iot_error.h"
\r
40 /* MQTT internal include. */
\r
41 #include "private/iot_mqtt_internal.h"
\r
43 /* Platform layer includes. */
\r
44 #include "platform/iot_clock.h"
\r
45 #include "platform/iot_threads.h"
\r
47 /* Validate MQTT configuration settings. */
\r
48 #if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1
\r
49 #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."
\r
51 #if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1
\r
52 #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."
\r
54 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1
\r
55 #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."
\r
57 #if IOT_MQTT_RESPONSE_WAIT_MS <= 0
\r
58 #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."
\r
60 #if IOT_MQTT_RETRY_MS_CEILING <= 0
\r
61 #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."
\r
64 /*-----------------------------------------------------------*/
\r
67 * @brief Set the unsubscribed flag of an MQTT subscription.
\r
69 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
70 * @param[in] pMatch Not used.
\r
72 * @return Always returns `true`.
\r
74 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,
\r
78 * @brief Destroy an MQTT subscription if its reference count is 0.
\r
80 * @param[in] pData The subscription to destroy. This parameter is of type
\r
81 * `void*` for compatibility with [free]
\r
82 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
84 static void _mqttSubscription_tryDestroy( void * pData );
\r
87 * @brief Decrement the reference count of an MQTT operation and attempt to
\r
90 * @param[in] pData The operation data to destroy. This parameter is of type
\r
91 * `void*` for compatibility with [free]
\r
92 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
94 static void _mqttOperation_tryDestroy( void * pData );
\r
97 * @brief Create a keep-alive job for an MQTT connection.
\r
99 * @param[in] pNetworkInfo User-provided network information for the new
\r
101 * @param[in] keepAliveSeconds User-provided keep-alive interval.
\r
102 * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.
\r
104 * @return `true` if the keep-alive job was successfully created; `false` otherwise.
\r
106 static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
107 uint16_t keepAliveSeconds,
\r
108 _mqttConnection_t * pMqttConnection );
\r
111 * @brief Creates a new MQTT connection and initializes its members.
\r
113 * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.
\r
114 * @param[in] pNetworkInfo User-provided network information for the new
\r
116 * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.
\r
118 * @return Pointer to a newly-created MQTT connection; `NULL` on failure.
\r
120 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
\r
121 const IotMqttNetworkInfo_t * pNetworkInfo,
\r
122 uint16_t keepAliveSeconds );
\r
125 * @brief Destroys the members of an MQTT connection.
\r
127 * @param[in] pMqttConnection Which connection to destroy.
\r
129 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );
\r
132 * @brief The common component of both @ref mqtt_function_subscribe and @ref
\r
133 * mqtt_function_unsubscribe.
\r
135 * See @ref mqtt_function_subscribe or @ref mqtt_function_unsubscribe for a
\r
136 * description of the parameters and return values.
\r
138 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
\r
139 IotMqttConnection_t mqttConnection,
\r
140 const IotMqttSubscription_t * pSubscriptionList,
\r
141 size_t subscriptionCount,
\r
143 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
144 IotMqttOperation_t * pOperationReference );
\r
146 /*-----------------------------------------------------------*/
\r
148 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,
\r
151 /* Because this function is called from a container function, the given link
\r
152 * must never be NULL. */
\r
153 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
155 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
159 /* Silence warnings about unused parameters. */
\r
162 /* Set the unsubscribed flag. */
\r
163 pSubscription->unsubscribed = true;
\r
168 /*-----------------------------------------------------------*/
\r
170 static void _mqttSubscription_tryDestroy( void * pData )
\r
172 _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;
\r
174 /* Reference count must not be negative. */
\r
175 IotMqtt_Assert( pSubscription->references >= 0 );
\r
177 /* Unsubscribed flag should be set. */
\r
178 IotMqtt_Assert( pSubscription->unsubscribed == true );
\r
180 /* Free the subscription if it has no references. */
\r
181 if( pSubscription->references == 0 )
\r
183 IotMqtt_FreeSubscription( pSubscription );
\r
191 /*-----------------------------------------------------------*/
\r
193 static void _mqttOperation_tryDestroy( void * pData )
\r
195 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;
\r
196 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
198 /* Incoming PUBLISH operations may always be freed. */
\r
199 if( pOperation->incomingPublish == true )
\r
201 /* Cancel the incoming PUBLISH operation's job. */
\r
202 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
206 /* If the operation's job was not canceled, it must be already executing.
\r
207 * Any other return value is invalid. */
\r
208 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
209 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
211 /* Check if the incoming PUBLISH job was canceled. */
\r
212 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
214 /* Job was canceled. Process incoming PUBLISH now to clean up. */
\r
215 _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,
\r
221 /* The executing job will process the PUBLISH, so nothing is done here. */
\r
227 /* Decrement reference count and destroy operation if possible. */
\r
228 if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )
\r
230 _IotMqtt_DestroyOperation( pOperation );
\r
239 /*-----------------------------------------------------------*/
\r
241 static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
242 uint16_t keepAliveSeconds,
\r
243 _mqttConnection_t * pMqttConnection )
\r
245 bool status = true;
\r
246 IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
\r
247 IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;
\r
249 /* Network information is not used when MQTT packet serializers are disabled. */
\r
250 ( void ) pNetworkInfo;
\r
252 /* Default PINGREQ serializer function. */
\r
253 IotMqttError_t ( * serializePingreq )( uint8_t **,
\r
254 size_t * ) = _IotMqtt_SerializePingreq;
\r
256 /* Convert the keep-alive interval to milliseconds. */
\r
257 pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;
\r
258 pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
\r
260 /* Choose a PINGREQ serializer function. */
\r
261 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
262 if( pNetworkInfo->pMqttSerializer != NULL )
\r
264 if( pNetworkInfo->pMqttSerializer->serialize.pingreq != NULL )
\r
266 serializePingreq = pNetworkInfo->pMqttSerializer->serialize.pingreq;
\r
277 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
279 /* Generate a PINGREQ packet. */
\r
280 serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),
\r
281 &( pMqttConnection->pingreqPacketSize ) );
\r
283 if( serializeStatus != IOT_MQTT_SUCCESS )
\r
285 IotLogError( "Failed to allocate PINGREQ packet for new connection." );
\r
291 /* Create the task pool job that processes keep-alive. */
\r
292 jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
\r
294 &( pMqttConnection->keepAliveJobStorage ),
\r
295 &( pMqttConnection->keepAliveJob ) );
\r
297 /* Task pool job creation for a pre-allocated job should never fail.
\r
298 * Abort the program if it does. */
\r
299 if( jobStatus != IOT_TASKPOOL_SUCCESS )
\r
301 IotLogError( "Failed to create keep-alive job for new connection." );
\r
303 IotMqtt_Assert( false );
\r
310 /* Keep-alive references its MQTT connection, so increment reference. */
\r
311 ( pMqttConnection->references )++;
\r
317 /*-----------------------------------------------------------*/
\r
319 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
\r
320 const IotMqttNetworkInfo_t * pNetworkInfo,
\r
321 uint16_t keepAliveSeconds )
\r
323 IOT_FUNCTION_ENTRY( bool, true );
\r
324 _mqttConnection_t * pMqttConnection = NULL;
\r
325 bool referencesMutexCreated = false, subscriptionMutexCreated = false;
\r
327 /* Allocate memory for the new MQTT connection. */
\r
328 pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );
\r
330 if( pMqttConnection == NULL )
\r
332 IotLogError( "Failed to allocate memory for new connection." );
\r
334 IOT_SET_AND_GOTO_CLEANUP( false );
\r
338 /* Clear the MQTT connection, then copy the MQTT server mode, network
\r
339 * interface, and disconnect callback. */
\r
340 ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );
\r
341 pMqttConnection->awsIotMqttMode = awsIotMqttMode;
\r
342 pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;
\r
343 pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;
\r
345 /* Start a new MQTT connection with a reference count of 1. */
\r
346 pMqttConnection->references = 1;
\r
349 /* Create the references mutex for a new connection. It is a recursive mutex. */
\r
350 referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );
\r
352 if( referencesMutexCreated == false )
\r
354 IotLogError( "Failed to create references mutex for new connection." );
\r
356 IOT_SET_AND_GOTO_CLEANUP( false );
\r
363 /* Create the subscription mutex for a new connection. */
\r
364 subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );
\r
366 if( subscriptionMutexCreated == false )
\r
368 IotLogError( "Failed to create subscription mutex for new connection." );
\r
370 IOT_SET_AND_GOTO_CLEANUP( false );
\r
377 /* Create the new connection's subscription and operation lists. */
\r
378 IotListDouble_Create( &( pMqttConnection->subscriptionList ) );
\r
379 IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );
\r
380 IotListDouble_Create( &( pMqttConnection->pendingResponse ) );
\r
382 /* AWS IoT service limits set minimum and maximum values for keep-alive interval.
\r
383 * Adjust the user-provided keep-alive interval based on these requirements. */
\r
384 if( awsIotMqttMode == true )
\r
386 if( keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )
\r
388 keepAliveSeconds = AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE;
\r
390 else if( keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )
\r
392 keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;
\r
394 else if( keepAliveSeconds == 0 )
\r
396 keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;
\r
408 /* Check if keep-alive is active for this connection. */
\r
409 if( keepAliveSeconds != 0 )
\r
411 if( _createKeepAliveJob( pNetworkInfo,
\r
413 pMqttConnection ) == false )
\r
415 IOT_SET_AND_GOTO_CLEANUP( false );
\r
427 /* Clean up mutexes and connection if this function failed. */
\r
428 IOT_FUNCTION_CLEANUP_BEGIN();
\r
430 if( status == false )
\r
432 if( subscriptionMutexCreated == true )
\r
434 IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );
\r
441 if( referencesMutexCreated == true )
\r
443 IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );
\r
450 if( pMqttConnection != NULL )
\r
452 IotMqtt_FreeConnection( pMqttConnection );
\r
453 pMqttConnection = NULL;
\r
465 return pMqttConnection;
\r
468 /*-----------------------------------------------------------*/
\r
470 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
\r
472 IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
\r
474 /* Clean up keep-alive if still allocated. */
\r
475 if( pMqttConnection->keepAliveMs != 0 )
\r
477 IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );
\r
479 _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
\r
481 /* Clear data about the keep-alive. */
\r
482 pMqttConnection->keepAliveMs = 0;
\r
483 pMqttConnection->pPingreqPacket = NULL;
\r
484 pMqttConnection->pingreqPacketSize = 0;
\r
486 /* Decrement reference count. */
\r
487 pMqttConnection->references--;
\r
494 /* A connection to be destroyed should have no keep-alive and at most 1
\r
496 IotMqtt_Assert( pMqttConnection->references <= 1 );
\r
497 IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );
\r
498 IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );
\r
499 IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );
\r
501 /* Remove all subscriptions. */
\r
502 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
503 IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
\r
504 _mqttSubscription_setUnsubscribe,
\r
506 _mqttSubscription_tryDestroy,
\r
507 offsetof( _mqttSubscription_t, link ) );
\r
508 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
510 /* Destroy an owned network connection. */
\r
511 if( pMqttConnection->ownNetworkConnection == true )
\r
513 networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );
\r
515 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
517 IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",
\r
522 IotLogInfo( "(MQTT connection %p) Network connection destroyed.",
\r
531 /* Destroy mutexes. */
\r
532 IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );
\r
533 IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );
\r
535 IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );
\r
537 /* Free connection. */
\r
538 IotMqtt_FreeConnection( pMqttConnection );
\r
541 /*-----------------------------------------------------------*/
\r
543 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
\r
544 IotMqttConnection_t mqttConnection,
\r
545 const IotMqttSubscription_t * pSubscriptionList,
\r
546 size_t subscriptionCount,
\r
548 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
549 IotMqttOperation_t * pOperationReference )
\r
551 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
552 _mqttOperation_t * pSubscriptionOperation = NULL;
\r
554 /* Subscription serializer function. */
\r
555 IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,
\r
559 uint16_t * ) = NULL;
\r
561 /* This function should only be called for subscribe or unsubscribe. */
\r
562 IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
\r
563 ( operation == IOT_MQTT_UNSUBSCRIBE ) );
\r
565 /* Check that all elements in the subscription list are valid. */
\r
566 if( _IotMqtt_ValidateSubscriptionList( operation,
\r
567 mqttConnection->awsIotMqttMode,
\r
569 subscriptionCount ) == false )
\r
571 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
578 /* Check that a reference pointer is provided for a waitable operation. */
\r
579 if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
581 if( pOperationReference == NULL )
\r
583 IotLogError( "Reference must be provided for a waitable %s.",
\r
584 IotMqtt_OperationType( operation ) );
\r
586 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
598 /* Choose a subscription serialize function. */
\r
599 if( operation == IOT_MQTT_SUBSCRIBE )
\r
601 serializeSubscription = _IotMqtt_SerializeSubscribe;
\r
603 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
604 if( mqttConnection->pSerializer != NULL )
\r
606 if( mqttConnection->pSerializer->serialize.subscribe != NULL )
\r
608 serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;
\r
619 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
623 serializeSubscription = _IotMqtt_SerializeUnsubscribe;
\r
625 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
626 if( mqttConnection->pSerializer != NULL )
\r
628 if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )
\r
630 serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;
\r
641 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
644 /* Remove the MQTT subscription list for an UNSUBSCRIBE. */
\r
645 if( operation == IOT_MQTT_UNSUBSCRIBE )
\r
647 _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,
\r
649 subscriptionCount );
\r
656 /* Create a subscription operation. */
\r
657 status = _IotMqtt_CreateOperation( mqttConnection,
\r
660 &pSubscriptionOperation );
\r
662 if( status != IOT_MQTT_SUCCESS )
\r
664 IOT_GOTO_CLEANUP();
\r
667 /* Check the subscription operation data and set the operation type. */
\r
668 IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
669 IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );
\r
670 pSubscriptionOperation->u.operation.type = operation;
\r
672 /* Generate a subscription packet from the subscription list. */
\r
673 status = serializeSubscription( pSubscriptionList,
\r
675 &( pSubscriptionOperation->u.operation.pMqttPacket ),
\r
676 &( pSubscriptionOperation->u.operation.packetSize ),
\r
677 &( pSubscriptionOperation->u.operation.packetIdentifier ) );
\r
679 if( status != IOT_MQTT_SUCCESS )
\r
681 IOT_GOTO_CLEANUP();
\r
684 /* Check the serialized MQTT packet. */
\r
685 IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );
\r
686 IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );
\r
688 /* Add the subscription list for a SUBSCRIBE. */
\r
689 if( operation == IOT_MQTT_SUBSCRIBE )
\r
691 status = _IotMqtt_AddSubscriptions( mqttConnection,
\r
692 pSubscriptionOperation->u.operation.packetIdentifier,
\r
694 subscriptionCount );
\r
696 if( status != IOT_MQTT_SUCCESS )
\r
698 IOT_GOTO_CLEANUP();
\r
702 /* Set the reference, if provided. */
\r
703 if( pOperationReference != NULL )
\r
705 *pOperationReference = pSubscriptionOperation;
\r
708 /* Schedule the subscription operation for network transmission. */
\r
709 status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,
\r
710 _IotMqtt_ProcessSend,
\r
713 if( status != IOT_MQTT_SUCCESS )
\r
715 IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",
\r
717 IotMqtt_OperationType( operation ) );
\r
719 if( operation == IOT_MQTT_SUBSCRIBE )
\r
721 _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,
\r
722 pSubscriptionOperation->u.operation.packetIdentifier,
\r
726 /* Clear the previously set (and now invalid) reference. */
\r
727 if( pOperationReference != NULL )
\r
729 *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;
\r
732 IOT_GOTO_CLEANUP();
\r
735 /* Clean up if this function failed. */
\r
736 IOT_FUNCTION_CLEANUP_BEGIN();
\r
738 if( status != IOT_MQTT_SUCCESS )
\r
740 if( pSubscriptionOperation != NULL )
\r
742 _IotMqtt_DestroyOperation( pSubscriptionOperation );
\r
747 status = IOT_MQTT_STATUS_PENDING;
\r
749 IotLogInfo( "(MQTT connection %p) %s operation scheduled.",
\r
751 IotMqtt_OperationType( operation ) );
\r
754 IOT_FUNCTION_CLEANUP_END();
\r
757 /*-----------------------------------------------------------*/
\r
759 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )
\r
761 bool disconnected = false;
\r
763 /* Lock the mutex protecting the reference count. */
\r
764 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
766 /* Reference count must not be negative. */
\r
767 IotMqtt_Assert( pMqttConnection->references >= 0 );
\r
769 /* Read connection status. */
\r
770 disconnected = pMqttConnection->disconnected;
\r
772 /* Increment the connection's reference count if it is not disconnected. */
\r
773 if( disconnected == false )
\r
775 ( pMqttConnection->references )++;
\r
776 IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",
\r
778 ( long int ) pMqttConnection->references - 1,
\r
779 ( long int ) pMqttConnection->references );
\r
783 IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );
\r
786 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
788 return( disconnected == false );
\r
791 /*-----------------------------------------------------------*/
\r
793 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )
\r
795 bool destroyConnection = false;
\r
797 /* Lock the mutex protecting the reference count. */
\r
798 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
800 /* Decrement reference count. It must not be negative. */
\r
801 ( pMqttConnection->references )--;
\r
802 IotMqtt_Assert( pMqttConnection->references >= 0 );
\r
804 IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",
\r
806 ( long int ) pMqttConnection->references + 1,
\r
807 ( long int ) pMqttConnection->references );
\r
809 /* Check if this connection may be destroyed. */
\r
810 if( pMqttConnection->references == 0 )
\r
812 destroyConnection = true;
\r
819 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
821 /* Destroy an unreferenced MQTT connection. */
\r
822 if( destroyConnection == true )
\r
824 IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",
\r
826 _destroyMqttConnection( pMqttConnection );
\r
834 /*-----------------------------------------------------------*/
\r
836 IotMqttError_t IotMqtt_Init( void )
\r
838 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
840 /* Call any additional serializer initialization function if serializer
\r
841 * overrides are enabled. */
\r
842 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
843 #ifdef _IotMqtt_InitSerializeAdditional
\r
844 if( _IotMqtt_InitSerializeAdditional() == false )
\r
846 status = IOT_MQTT_INIT_FAILED;
\r
853 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
855 /* Log initialization status. */
\r
856 if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0
\r
858 IotLogError( "Failed to initialize MQTT library serializer. " );
\r
862 IotLogInfo( "MQTT library successfully initialized." );
\r
868 /*-----------------------------------------------------------*/
\r
870 void IotMqtt_Cleanup( void )
\r
872 /* Call any additional serializer cleanup initialization function if serializer
\r
873 * overrides are enabled. */
\r
874 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
875 #ifdef _IotMqtt_CleanupSerializeAdditional
\r
876 _IotMqtt_CleanupSerializeAdditional();
\r
880 IotLogInfo( "MQTT library cleanup done." );
\r
883 /*-----------------------------------------------------------*/
\r
885 IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
886 const IotMqttConnectInfo_t * pConnectInfo,
\r
887 uint32_t timeoutMs,
\r
888 IotMqttConnection_t * const pMqttConnection )
\r
890 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
891 bool networkCreated = false, ownNetworkConnection = false;
\r
892 IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
\r
893 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
894 void * pNetworkConnection = NULL;
\r
895 _mqttOperation_t * pOperation = NULL;
\r
896 _mqttConnection_t * pNewMqttConnection = NULL;
\r
898 /* Default CONNECT serializer function. */
\r
899 IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.
\r
901 size_t * ) = _IotMqtt_SerializeConnect;
\r
903 /* Network info must not be NULL. */
\r
904 if( pNetworkInfo == NULL )
\r
906 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
913 /* Validate network interface and connect info. */
\r
914 if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().
\r
916 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
923 /* If will info is provided, check that it is valid. */
\r
924 if( pConnectInfo->pWillInfo != NULL )
\r
926 if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,
\r
927 pConnectInfo->pWillInfo ) == false )
\r
929 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
931 else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )
\r
933 /* Will message payloads cannot be larger than 65535. This restriction
\r
934 * applies only to will messages, and not normal PUBLISH messages. */
\r
935 IotLogError( "Will payload cannot be larger than 65535." );
\r
937 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
949 /* If previous subscriptions are provided, check that they are valid. */
\r
950 if( pConnectInfo->cleanSession == false )
\r
952 if( pConnectInfo->pPreviousSubscriptions != NULL )
\r
954 if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,
\r
955 pConnectInfo->awsIotMqttMode,
\r
956 pConnectInfo->pPreviousSubscriptions,
\r
957 pConnectInfo->previousSubscriptionCount ) == false )
\r
959 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
976 /* Create a new MQTT connection if requested. Otherwise, copy the existing
\r
977 * network connection. */
\r
978 if( pNetworkInfo->createNetworkConnection == true )
\r
980 networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,
\r
981 pNetworkInfo->u.setup.pNetworkCredentialInfo,
\r
982 &pNetworkConnection );
\r
984 if( networkStatus == IOT_NETWORK_SUCCESS )
\r
986 networkCreated = true;
\r
988 /* This MQTT connection owns the network connection it created and
\r
989 * should destroy it on cleanup. */
\r
990 ownNetworkConnection = true;
\r
994 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
999 pNetworkConnection = pNetworkInfo->u.pNetworkConnection;
\r
1000 networkCreated = true;
\r
1003 IotLogInfo( "Establishing new MQTT connection." );
\r
1005 /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name? I don't think this does create a connection as that happens below.
\r
1006 pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
\r
1008 pConnectInfo->keepAliveSeconds );
\r
1010 if( pNewMqttConnection == NULL )
\r
1012 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
1016 /* Set the network connection associated with the MQTT connection. */
\r
1017 pNewMqttConnection->pNetworkConnection = pNetworkConnection;
\r
1018 pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;
\r
1020 /* Set the MQTT packet serializer overrides. */
\r
1021 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1022 pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;
\r
1026 /* Set the MQTT receive callback. */
\r
1027 networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,
\r
1028 IotMqtt_ReceiveCallback,
\r
1029 pNewMqttConnection );
\r
1031 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1033 IotLogError( "Failed to set MQTT network receive callback." );
\r
1035 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
1039 EMPTY_ELSE_MARKER;
\r
1042 /* Create a CONNECT operation. */
\r
1043 status = _IotMqtt_CreateOperation( pNewMqttConnection,
\r
1044 IOT_MQTT_FLAG_WAITABLE,
\r
1048 if( status != IOT_MQTT_SUCCESS )
\r
1050 IOT_GOTO_CLEANUP();
\r
1054 EMPTY_ELSE_MARKER;
\r
1057 /* Ensure the members set by operation creation and serialization
\r
1058 * are appropriate for a blocking CONNECT. */
\r
1059 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1060 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
\r
1061 == IOT_MQTT_FLAG_WAITABLE );
\r
1062 IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
\r
1064 /* Set the operation type. */
\r
1065 pOperation->u.operation.type = IOT_MQTT_CONNECT;
\r
1067 /* Add previous session subscriptions. */
\r
1068 if( pConnectInfo->pPreviousSubscriptions != NULL )
\r
1070 /* Previous subscription count should have been validated as nonzero. */
\r
1071 IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );
\r
1073 status = _IotMqtt_AddSubscriptions( pNewMqttConnection,
\r
1075 pConnectInfo->pPreviousSubscriptions,
\r
1076 pConnectInfo->previousSubscriptionCount );
\r
1078 if( status != IOT_MQTT_SUCCESS )
\r
1080 IOT_GOTO_CLEANUP();
\r
1084 EMPTY_ELSE_MARKER;
\r
1089 EMPTY_ELSE_MARKER;
\r
1092 /* Choose a CONNECT serializer function. */
\r
1093 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1094 if( pNewMqttConnection->pSerializer != NULL )
\r
1096 if( pNewMqttConnection->pSerializer->serialize.connect != NULL )
\r
1098 serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;
\r
1102 EMPTY_ELSE_MARKER;
\r
1107 EMPTY_ELSE_MARKER;
\r
1109 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1111 /* Convert the connect info and will info objects to an MQTT CONNECT packet. */
\r
1112 status = serializeConnect( pConnectInfo,
\r
1113 &( pOperation->u.operation.pMqttPacket ),
\r
1114 &( pOperation->u.operation.packetSize ) );
\r
1116 if( status != IOT_MQTT_SUCCESS )
\r
1118 IOT_GOTO_CLEANUP();
\r
1122 EMPTY_ELSE_MARKER;
\r
1125 /* Check the serialized MQTT packet. */
\r
1126 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1127 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1129 /* Add the CONNECT operation to the send queue for network transmission. */
\r
1130 status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?
\r
1131 _IotMqtt_ProcessSend,
\r
1134 if( status != IOT_MQTT_SUCCESS )
\r
1136 IotLogError( "Failed to enqueue CONNECT for sending." );
\r
1140 /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */
\r
1141 status = IotMqtt_Wait( pOperation,
\r
1144 /* The call to wait cleans up the CONNECT operation, so set the pointer
\r
1146 pOperation = NULL;
\r
1149 /* When a connection is successfully established, schedule keep-alive job. */
\r
1150 if( status == IOT_MQTT_SUCCESS )
\r
1152 /* Check if a keep-alive job should be scheduled. */
\r
1153 if( pNewMqttConnection->keepAliveMs != 0 )
\r
1155 IotLogDebug( "Scheduling first MQTT keep-alive job." );
\r
1157 taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
\r
1158 pNewMqttConnection->keepAliveJob,
\r
1159 pNewMqttConnection->nextKeepAliveMs );
\r
1161 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1163 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );
\r
1167 EMPTY_ELSE_MARKER;
\r
1172 EMPTY_ELSE_MARKER;
\r
1177 EMPTY_ELSE_MARKER;
\r
1180 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1182 if( status != IOT_MQTT_SUCCESS )
\r
1184 IotLogError( "Failed to establish new MQTT connection, error %s.",
\r
1185 IotMqtt_strerror( status ) );
\r
1187 /* The network connection must be closed if it was created. */
\r
1188 if( networkCreated == true )
\r
1190 networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );
\r
1192 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1194 IotLogWarn( "Failed to close network connection." );
\r
1198 IotLogInfo( "Network connection closed on error." );
\r
1203 EMPTY_ELSE_MARKER;
\r
1206 if( pOperation != NULL )
\r
1208 _IotMqtt_DestroyOperation( pOperation );
\r
1212 EMPTY_ELSE_MARKER;
\r
1215 if( pNewMqttConnection != NULL )
\r
1217 _destroyMqttConnection( pNewMqttConnection );
\r
1221 EMPTY_ELSE_MARKER;
\r
1226 IotLogInfo( "New MQTT connection %p established.", pMqttConnection );
\r
1228 /* Set the output parameter. */
\r
1229 *pMqttConnection = pNewMqttConnection;
\r
1232 IOT_FUNCTION_CLEANUP_END();
\r
1235 /*-----------------------------------------------------------*/
\r
1237 void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
\r
1240 bool disconnected = false;
\r
1241 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1242 _mqttOperation_t * pOperation = NULL;
\r
1244 IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );
\r
1246 /* Read the connection status. */
\r
1247 IotMutex_Lock( &( mqttConnection->referencesMutex ) );
\r
1248 disconnected = mqttConnection->disconnected;
\r
1249 IotMutex_Unlock( &( mqttConnection->referencesMutex ) );
\r
1251 /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"
\r
1252 * flag is not set. */
\r
1253 if( disconnected == false )
\r
1255 if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == 0 )
\r
1257 /* Create a DISCONNECT operation. This function blocks until the DISCONNECT
\r
1258 * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */
\r
1259 status = _IotMqtt_CreateOperation( mqttConnection,
\r
1260 IOT_MQTT_FLAG_WAITABLE,
\r
1264 if( status == IOT_MQTT_SUCCESS )
\r
1266 /* Ensure that the members set by operation creation and serialization
\r
1267 * are appropriate for a blocking DISCONNECT. */
\r
1268 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1269 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
\r
1270 == IOT_MQTT_FLAG_WAITABLE );
\r
1271 IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
\r
1273 /* Set the operation type. */
\r
1274 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;
\r
1276 /* Choose a disconnect serializer. */
\r
1277 IotMqttError_t ( * serializeDisconnect )( uint8_t **,
\r
1278 size_t * ) = _IotMqtt_SerializeDisconnect;
\r
1280 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1281 if( mqttConnection->pSerializer != NULL )
\r
1283 if( mqttConnection->pSerializer->serialize.disconnect != NULL )
\r
1285 serializeDisconnect = mqttConnection->pSerializer->serialize.disconnect;
\r
1289 EMPTY_ELSE_MARKER;
\r
1294 EMPTY_ELSE_MARKER;
\r
1296 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1298 /* Generate a DISCONNECT packet. */
\r
1299 status = serializeDisconnect( &( pOperation->u.operation.pMqttPacket ),
\r
1300 &( pOperation->u.operation.packetSize ) );
\r
1304 EMPTY_ELSE_MARKER;
\r
1307 if( status == IOT_MQTT_SUCCESS )
\r
1309 /* Check the serialized MQTT packet. */
\r
1310 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1311 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1313 /* Schedule the DISCONNECT operation for network transmission. */
\r
1314 if( _IotMqtt_ScheduleOperation( pOperation,
\r
1315 _IotMqtt_ProcessSend,
\r
1316 0 ) != IOT_MQTT_SUCCESS )
\r
1318 IotLogWarn( "(MQTT connection %p) Failed to schedule DISCONNECT for sending.",
\r
1320 _IotMqtt_DestroyOperation( pOperation );
\r
1324 /* Wait a short time for the DISCONNECT packet to be transmitted. */
\r
1325 status = IotMqtt_Wait( pOperation,
\r
1326 IOT_MQTT_RESPONSE_WAIT_MS );
\r
1328 /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,
\r
1329 * or NETWORK ERROR. */
\r
1330 if( status == IOT_MQTT_SUCCESS )
\r
1332 IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );
\r
1336 IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||
\r
1337 ( status == IOT_MQTT_NETWORK_ERROR ) );
\r
1339 IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",
\r
1341 IotMqtt_strerror( status ) );
\r
1347 EMPTY_ELSE_MARKER;
\r
1352 EMPTY_ELSE_MARKER;
\r
1357 EMPTY_ELSE_MARKER;
\r
1360 /* Close the underlying network connection. This also cleans up keep-alive. */
\r
1361 _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,
\r
1364 /* Check if the connection may be destroyed. */
\r
1365 IotMutex_Lock( &( mqttConnection->referencesMutex ) );
\r
1367 /* At this point, the connection should be marked disconnected. */
\r
1368 IotMqtt_Assert( mqttConnection->disconnected == true );
\r
1370 /* Attempt cancel and destroy each operation in the connection's lists. */
\r
1371 IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),
\r
1372 _mqttOperation_tryDestroy,
\r
1373 offsetof( _mqttOperation_t, link ) );
\r
1375 IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),
\r
1376 _mqttOperation_tryDestroy,
\r
1377 offsetof( _mqttOperation_t, link ) );
\r
1379 IotMutex_Unlock( &( mqttConnection->referencesMutex ) );
\r
1381 /* Decrement the connection reference count and destroy it if possible. */
\r
1382 _IotMqtt_DecrementConnectionReferences( mqttConnection );
\r
1385 /*-----------------------------------------------------------*/
\r
1387 IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
\r
1388 const IotMqttSubscription_t * pSubscriptionList,
\r
1389 size_t subscriptionCount,
\r
1391 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1392 IotMqttOperation_t * pSubscribeOperation )
\r
1394 return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,
\r
1396 pSubscriptionList,
\r
1397 subscriptionCount,
\r
1400 pSubscribeOperation );
\r
1403 /*-----------------------------------------------------------*/
\r
1405 IotMqttError_t IotMqtt_TimedSubscribe( IotMqttConnection_t mqttConnection,
\r
1406 const IotMqttSubscription_t * pSubscriptionList,
\r
1407 size_t subscriptionCount,
\r
1409 uint32_t timeoutMs )
\r
1411 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1412 IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1414 /* Flags are not used, but the parameter is present for future compatibility. */
\r
1417 /* Call the asynchronous SUBSCRIBE function. */
\r
1418 status = IotMqtt_Subscribe( mqttConnection,
\r
1419 pSubscriptionList,
\r
1420 subscriptionCount,
\r
1421 IOT_MQTT_FLAG_WAITABLE,
\r
1423 &subscribeOperation );
\r
1425 /* Wait for the SUBSCRIBE operation to complete. */
\r
1426 if( status == IOT_MQTT_STATUS_PENDING )
\r
1428 status = IotMqtt_Wait( subscribeOperation, timeoutMs );
\r
1432 EMPTY_ELSE_MARKER;
\r
1435 /* Ensure that a status was set. */
\r
1436 IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );
\r
1441 /*-----------------------------------------------------------*/
\r
1443 IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
\r
1444 const IotMqttSubscription_t * pSubscriptionList,
\r
1445 size_t subscriptionCount,
\r
1447 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1448 IotMqttOperation_t * pUnsubscribeOperation )
\r
1450 return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,
\r
1452 pSubscriptionList,
\r
1453 subscriptionCount,
\r
1456 pUnsubscribeOperation );
\r
1459 /*-----------------------------------------------------------*/
\r
1461 IotMqttError_t IotMqtt_TimedUnsubscribe( IotMqttConnection_t mqttConnection,
\r
1462 const IotMqttSubscription_t * pSubscriptionList,
\r
1463 size_t subscriptionCount,
\r
1465 uint32_t timeoutMs )
\r
1467 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1468 IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1470 /* Flags are not used, but the parameter is present for future compatibility. */
\r
1473 /* Call the asynchronous UNSUBSCRIBE function. */
\r
1474 status = IotMqtt_Unsubscribe( mqttConnection,
\r
1475 pSubscriptionList,
\r
1476 subscriptionCount,
\r
1477 IOT_MQTT_FLAG_WAITABLE,
\r
1479 &unsubscribeOperation );
\r
1481 /* Wait for the UNSUBSCRIBE operation to complete. */
\r
1482 if( status == IOT_MQTT_STATUS_PENDING )
\r
1484 status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );
\r
1488 EMPTY_ELSE_MARKER;
\r
1491 /* Ensure that a status was set. */
\r
1492 IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );
\r
1497 /*-----------------------------------------------------------*/
\r
1499 IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
\r
1500 const IotMqttPublishInfo_t * pPublishInfo,
\r
1502 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1503 IotMqttOperation_t * pPublishOperation )
\r
1505 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
1506 _mqttOperation_t * pOperation = NULL;
\r
1507 uint8_t ** pPacketIdentifierHigh = NULL;
\r
1509 /* Default PUBLISH serializer function. */
\r
1510 IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,
\r
1514 uint8_t ** ) = _IotMqtt_SerializePublish;
\r
1516 /* Check that the PUBLISH information is valid. */
\r
1517 if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,
\r
1518 pPublishInfo ) == false )
\r
1520 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1524 EMPTY_ELSE_MARKER;
\r
1527 /* Check that no notification is requested for a QoS 0 publish. */
\r
1528 if( pPublishInfo->qos == IOT_MQTT_QOS_0 )
\r
1530 if( pCallbackInfo != NULL )
\r
1532 IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
\r
1534 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1536 else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )
\r
1538 IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
\r
1540 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1544 EMPTY_ELSE_MARKER;
\r
1547 if( pPublishOperation != NULL )
\r
1549 IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );
\r
1553 EMPTY_ELSE_MARKER;
\r
1558 EMPTY_ELSE_MARKER;
\r
1561 /* Check that a reference pointer is provided for a waitable operation. */
\r
1562 if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
1564 if( pPublishOperation == NULL )
\r
1566 IotLogError( "Reference must be provided for a waitable PUBLISH." );
\r
1568 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1572 EMPTY_ELSE_MARKER;
\r
1577 EMPTY_ELSE_MARKER;
\r
1580 /* Create a PUBLISH operation. */
\r
1581 status = _IotMqtt_CreateOperation( mqttConnection,
\r
1586 if( status != IOT_MQTT_SUCCESS )
\r
1588 IOT_GOTO_CLEANUP();
\r
1592 EMPTY_ELSE_MARKER;
\r
1595 /* Check the PUBLISH operation data and set the operation type. */
\r
1596 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1597 pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;
\r
1599 /* Choose a PUBLISH serializer function. */
\r
1600 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1601 if( mqttConnection->pSerializer != NULL )
\r
1603 if( mqttConnection->pSerializer->serialize.publish != NULL )
\r
1605 serializePublish = mqttConnection->pSerializer->serialize.publish;
\r
1609 EMPTY_ELSE_MARKER;
\r
1614 EMPTY_ELSE_MARKER;
\r
1616 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1618 /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */
\r
1619 if( mqttConnection->awsIotMqttMode == true )
\r
1621 pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );
\r
1625 EMPTY_ELSE_MARKER;
\r
1628 /* Generate a PUBLISH packet from pPublishInfo. */
\r
1629 status = serializePublish( pPublishInfo,
\r
1630 &( pOperation->u.operation.pMqttPacket ),
\r
1631 &( pOperation->u.operation.packetSize ),
\r
1632 &( pOperation->u.operation.packetIdentifier ),
\r
1633 pPacketIdentifierHigh );
\r
1635 if( status != IOT_MQTT_SUCCESS )
\r
1637 IOT_GOTO_CLEANUP();
\r
1641 EMPTY_ELSE_MARKER;
\r
1644 /* Check the serialized MQTT packet. */
\r
1645 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1646 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1648 /* Initialize PUBLISH retry if retryLimit is set. */
\r
1649 if( pPublishInfo->retryLimit > 0 )
\r
1651 /* A QoS 0 PUBLISH may not be retried. */
\r
1652 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1654 pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;
\r
1655 pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;
\r
1659 EMPTY_ELSE_MARKER;
\r
1664 EMPTY_ELSE_MARKER;
\r
1667 /* Set the reference, if provided. */
\r
1668 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1670 if( pPublishOperation != NULL )
\r
1672 *pPublishOperation = pOperation;
\r
1676 EMPTY_ELSE_MARKER;
\r
1681 EMPTY_ELSE_MARKER;
\r
1684 /* Add the PUBLISH operation to the send queue for network transmission. */
\r
1685 status = _IotMqtt_ScheduleOperation( pOperation,
\r
1686 _IotMqtt_ProcessSend,
\r
1689 if( status != IOT_MQTT_SUCCESS )
\r
1691 IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",
\r
1694 /* Clear the previously set (and now invalid) reference. */
\r
1695 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1697 if( pPublishOperation != NULL )
\r
1699 *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1703 EMPTY_ELSE_MARKER;
\r
1708 EMPTY_ELSE_MARKER;
\r
1711 IOT_GOTO_CLEANUP();
\r
1715 EMPTY_ELSE_MARKER;
\r
1718 /* Clean up the PUBLISH operation if this function fails. Otherwise, set the
\r
1719 * appropriate return code based on QoS. */
\r
1720 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1722 if( status != IOT_MQTT_SUCCESS )
\r
1724 if( pOperation != NULL )
\r
1726 _IotMqtt_DestroyOperation( pOperation );
\r
1730 EMPTY_ELSE_MARKER;
\r
1735 if( pPublishInfo->qos > IOT_MQTT_QOS_0 )
\r
1737 status = IOT_MQTT_STATUS_PENDING;
\r
1741 EMPTY_ELSE_MARKER;
\r
1744 IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",
\r
1748 IOT_FUNCTION_CLEANUP_END();
\r
1751 /*-----------------------------------------------------------*/
\r
1753 IotMqttError_t IotMqtt_TimedPublish( IotMqttConnection_t mqttConnection,
\r
1754 const IotMqttPublishInfo_t * pPublishInfo,
\r
1756 uint32_t timeoutMs )
\r
1758 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1759 IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,
\r
1760 * pPublishOperation = NULL;
\r
1762 /* Clear the flags. */
\r
1765 /* Set the waitable flag and reference for QoS 1 PUBLISH. */
\r
1766 if( pPublishInfo->qos == IOT_MQTT_QOS_1 )
\r
1768 flags = IOT_MQTT_FLAG_WAITABLE;
\r
1769 pPublishOperation = &publishOperation;
\r
1773 EMPTY_ELSE_MARKER;
\r
1776 /* Call the asynchronous PUBLISH function. */
\r
1777 status = IotMqtt_Publish( mqttConnection,
\r
1781 pPublishOperation );
\r
1783 /* Wait for a queued QoS 1 PUBLISH to complete. */
\r
1784 if( pPublishInfo->qos == IOT_MQTT_QOS_1 )
\r
1786 if( status == IOT_MQTT_STATUS_PENDING )
\r
1788 status = IotMqtt_Wait( publishOperation, timeoutMs );
\r
1792 EMPTY_ELSE_MARKER;
\r
1797 EMPTY_ELSE_MARKER;
\r
1803 /*-----------------------------------------------------------*/
\r
1805 IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,
\r
1806 uint32_t timeoutMs )
\r
1808 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
1809 _mqttConnection_t * pMqttConnection = operation->pMqttConnection;
\r
1811 /* Validate the given operation reference. */
\r
1812 if( _IotMqtt_ValidateOperation( operation ) == false )
\r
1814 status = IOT_MQTT_BAD_PARAMETER;
\r
1818 EMPTY_ELSE_MARKER;
\r
1821 /* Check the MQTT connection status. */
\r
1822 if( status == IOT_MQTT_SUCCESS )
\r
1824 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1826 if( pMqttConnection->disconnected == true )
\r
1828 IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "
\r
1829 "Operation cannot be waited on.",
\r
1831 IotMqtt_OperationType( operation->u.operation.type ),
\r
1834 status = IOT_MQTT_NETWORK_ERROR;
\r
1838 IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",
\r
1840 IotMqtt_OperationType( operation->u.operation.type ),
\r
1844 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1846 /* Only wait on an operation if the MQTT connection is active. */
\r
1847 if( status == IOT_MQTT_SUCCESS )
\r
1849 if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),
\r
1850 timeoutMs ) == false )
\r
1852 status = IOT_MQTT_TIMEOUT;
\r
1854 /* Attempt to cancel the job of the timed out operation. */
\r
1855 ( void ) _IotMqtt_DecrementOperationReferences( operation, true );
\r
1857 /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */
\r
1858 if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )
\r
1860 IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"
\r
1861 " subscriptions of timed-out SUBSCRIBE.",
\r
1865 _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,
\r
1866 operation->u.operation.packetIdentifier,
\r
1871 EMPTY_ELSE_MARKER;
\r
1876 /* Retrieve the status of the completed operation. */
\r
1877 status = operation->u.operation.status;
\r
1880 IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",
\r
1882 IotMqtt_OperationType( operation->u.operation.type ),
\r
1884 IotMqtt_strerror( status ) );
\r
1888 EMPTY_ELSE_MARKER;
\r
1891 /* Wait is finished; decrement operation reference count. */
\r
1892 if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )
\r
1894 _IotMqtt_DestroyOperation( operation );
\r
1898 EMPTY_ELSE_MARKER;
\r
1903 EMPTY_ELSE_MARKER;
\r
1909 /*-----------------------------------------------------------*/
\r
1911 const char * IotMqtt_strerror( IotMqttError_t status )
\r
1913 const char * pMessage = NULL;
\r
1917 case IOT_MQTT_SUCCESS:
\r
1918 pMessage = "SUCCESS";
\r
1921 case IOT_MQTT_STATUS_PENDING:
\r
1922 pMessage = "PENDING";
\r
1925 case IOT_MQTT_INIT_FAILED:
\r
1926 pMessage = "INITIALIZATION FAILED";
\r
1929 case IOT_MQTT_BAD_PARAMETER:
\r
1930 pMessage = "BAD PARAMETER";
\r
1933 case IOT_MQTT_NO_MEMORY:
\r
1934 pMessage = "NO MEMORY";
\r
1937 case IOT_MQTT_NETWORK_ERROR:
\r
1938 pMessage = "NETWORK ERROR";
\r
1941 case IOT_MQTT_SCHEDULING_ERROR:
\r
1942 pMessage = "SCHEDULING ERROR";
\r
1945 case IOT_MQTT_BAD_RESPONSE:
\r
1946 pMessage = "BAD RESPONSE RECEIVED";
\r
1949 case IOT_MQTT_TIMEOUT:
\r
1950 pMessage = "TIMEOUT";
\r
1953 case IOT_MQTT_SERVER_REFUSED:
\r
1954 pMessage = "SERVER REFUSED";
\r
1957 case IOT_MQTT_RETRY_NO_RESPONSE:
\r
1958 pMessage = "NO RESPONSE";
\r
1962 pMessage = "INVALID STATUS";
\r
1969 /*-----------------------------------------------------------*/
\r
1971 const char * IotMqtt_OperationType( IotMqttOperationType_t operation )
\r
1973 const char * pMessage = NULL;
\r
1975 switch( operation )
\r
1977 case IOT_MQTT_CONNECT:
\r
1978 pMessage = "CONNECT";
\r
1981 case IOT_MQTT_PUBLISH_TO_SERVER:
\r
1982 pMessage = "PUBLISH";
\r
1985 case IOT_MQTT_PUBACK:
\r
1986 pMessage = "PUBACK";
\r
1989 case IOT_MQTT_SUBSCRIBE:
\r
1990 pMessage = "SUBSCRIBE";
\r
1993 case IOT_MQTT_UNSUBSCRIBE:
\r
1994 pMessage = "UNSUBSCRIBE";
\r
1997 case IOT_MQTT_PINGREQ:
\r
1998 pMessage = "PINGREQ";
\r
2001 case IOT_MQTT_DISCONNECT:
\r
2002 pMessage = "DISCONNECT";
\r
2006 pMessage = "INVALID OPERATION";
\r
2013 /*-----------------------------------------------------------*/
\r
2015 /* Provide access to internal functions and variables if testing. */
\r
2016 #if IOT_BUILD_TESTS == 1
\r
2017 #include "iot_test_access_mqtt_api.c"
\r