3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
24 * @file iot_mqtt_api.c
\r
25 * @brief Implements most user-facing functions of the MQTT library.
\r
28 /* The config header is always included first. */
\r
29 #include "iot_config.h"
\r
31 /* Standard includes. */
\r
34 /* Error handling include. */
\r
35 #include "iot_error.h"
\r
37 /* MQTT internal include. */
\r
38 #include "private/iot_mqtt_internal.h"
\r
40 /* Platform layer includes. */
\r
41 #include "platform/iot_clock.h"
\r
42 #include "platform/iot_threads.h"
\r
44 /* Atomics include. */
\r
45 #include "iot_atomic.h"
\r
47 /* Validate MQTT configuration settings. */
\r
48 #if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1
\r
49 #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."
\r
51 #if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1
\r
52 #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."
\r
54 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1
\r
55 #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."
\r
57 #if IOT_MQTT_RESPONSE_WAIT_MS <= 0
\r
58 #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."
\r
60 #if IOT_MQTT_RETRY_MS_CEILING <= 0
\r
61 #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."
\r
64 /*-----------------------------------------------------------*/
\r
67 * @brief Uninitialized value for @ref _initCalled.
\r
69 #define MQTT_LIBRARY_UNINITIALIZED ( ( uint32_t ) 0 )
\r
72 * @brief Initialized value for @ref _initCalled.
\r
74 #define MQTT_LIBRARY_INITIALIZED ( ( uint32_t ) 1 )
\r
76 /*-----------------------------------------------------------*/
\r
79 * @brief Check if the library is initialized.
\r
81 * @return `true` if IotMqtt_Init was called; `false` otherwise.
\r
83 static bool _checkInit( void );
\r
86 * @brief Set the unsubscribed flag of an MQTT subscription.
\r
88 * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
\r
89 * @param[in] pMatch Not used.
\r
91 * @return Always returns `true`.
\r
93 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,
\r
97 * @brief Destroy an MQTT subscription if its reference count is 0.
\r
99 * @param[in] pData The subscription to destroy. This parameter is of type
\r
100 * `void*` for compatibility with [free]
\r
101 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
103 static void _mqttSubscription_tryDestroy( void * pData );
\r
106 * @brief Decrement the reference count of an MQTT operation and attempt to
\r
109 * @param[in] pData The operation data to destroy. This parameter is of type
\r
110 * `void*` for compatibility with [free]
\r
111 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
113 static void _mqttOperation_tryDestroy( void * pData );
\r
116 * @brief Initialize the keep-alive operation for an MQTT connection.
\r
118 * @param[in] pNetworkInfo User-provided network information for the new
\r
120 * @param[in] keepAliveSeconds User-provided keep-alive interval.
\r
121 * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.
\r
123 * @return `true` if the keep-alive job was successfully created; `false` otherwise.
\r
125 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
126 uint16_t keepAliveSeconds,
\r
127 _mqttConnection_t * pMqttConnection );
\r
130 * @brief Initialize a network connection, creating it if necessary.
\r
132 * @param[in] pNetworkInfo User-provided network information for the connection
\r
134 * @param[out] pNetworkConnection On success, the created and/or initialized network connection.
\r
135 * @param[out] pCreatedNewNetworkConnection On success, `true` if a new network connection was created; `false` if an existing one will be used.
\r
137 * @return Any #IotNetworkError_t, as defined by the network stack.
\r
139 static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
140 IotNetworkConnection_t * pNetworkConnection,
\r
141 bool * pCreatedNewNetworkConnection );
\r
144 * @brief Creates a new MQTT connection and initializes its members.
\r
146 * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.
\r
147 * @param[in] pNetworkInfo User-provided network information for the new
\r
149 * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.
\r
151 * @return Pointer to a newly-created MQTT connection; `NULL` on failure.
\r
153 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
\r
154 const IotMqttNetworkInfo_t * pNetworkInfo,
\r
155 uint16_t keepAliveSeconds );
\r
158 * @brief Destroys the members of an MQTT connection.
\r
160 * @param[in] pMqttConnection Which connection to destroy.
\r
162 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );
\r
165 * @brief Common setup function for subscribe and unsubscribe operations.
\r
167 * See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a
\r
168 * description of the parameters and return values.
\r
170 static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation,
\r
171 IotMqttConnection_t mqttConnection,
\r
172 const IotMqttSubscription_t * pSubscriptionList,
\r
173 size_t subscriptionCount,
\r
175 IotMqttOperation_t * const pOperationReference );
\r
178 * @brief Utility function for creating and serializing subscription requests
\r
180 * See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a
\r
181 * description of the parameters and return values.
\r
183 static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t operation,
\r
184 IotMqttConnection_t mqttConnection,
\r
185 IotMqttSerializeSubscribe_t serializeSubscription,
\r
186 const IotMqttSubscription_t * pSubscriptionList,
\r
187 size_t subscriptionCount,
\r
189 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
190 _mqttOperation_t ** ppSubscriptionOperation );
\r
193 * @brief The common component of both @ref mqtt_function_subscribeasync and @ref
\r
194 * mqtt_function_unsubscribeasync.
\r
196 * See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a
\r
197 * description of the parameters and return values.
\r
199 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
\r
200 IotMqttConnection_t mqttConnection,
\r
201 IotMqttSerializeSubscribe_t serializeSubscription,
\r
202 const IotMqttSubscription_t * pSubscriptionList,
\r
203 size_t subscriptionCount,
\r
205 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
206 IotMqttOperation_t * const pOperationReference );
\r
209 * @cond DOXYGEN_IGNORE
\r
210 * Doxygen should ignore this section.
\r
212 * Declaration of local MQTT serializer override selectors
\r
214 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
215 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializePingreq_t,
\r
216 _getMqttPingreqSerializer,
\r
217 _IotMqtt_SerializePingreq,
\r
218 serialize.pingreq )
\r
219 _SERIALIZER_OVERRIDE_SELECTOR( IotMqtt_SerializePublish_t,
\r
220 _getMqttPublishSerializer,
\r
221 _IotMqtt_SerializePublish,
\r
222 serialize.publish )
\r
223 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t,
\r
224 _getMqttFreePacketFunc,
\r
225 _IotMqtt_FreePacket,
\r
227 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeSubscribe_t,
\r
228 _getMqttSubscribeSerializer,
\r
229 _IotMqtt_SerializeSubscribe,
\r
230 serialize.subscribe )
\r
231 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeSubscribe_t,
\r
232 _getMqttUnsubscribeSerializer,
\r
233 _IotMqtt_SerializeUnsubscribe,
\r
234 serialize.unsubscribe )
\r
235 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeConnect_t,
\r
236 _getMqttConnectSerializer,
\r
237 _IotMqtt_SerializeConnect,
\r
238 serialize.connect )
\r
239 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeDisconnect_t,
\r
240 _getMqttDisconnectSerializer,
\r
241 _IotMqtt_SerializeDisconnect,
\r
242 serialize.disconnect )
\r
243 #else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
244 #define _getMqttPingreqSerializer( pSerializer ) _IotMqtt_SerializePingreq
\r
245 #define _getMqttPublishSerializer( pSerializer ) _IotMqtt_SerializePublish
\r
246 #define _getMqttFreePacketFunc( pSerializer ) _IotMqtt_FreePacket
\r
247 #define _getMqttSubscribeSerializer( pSerializer ) _IotMqtt_SerializeSubscribe
\r
248 #define _getMqttUnsubscribeSerializer( pSerializer ) _IotMqtt_SerializeUnsubscribe
\r
249 #define _getMqttConnectSerializer( pSerializer ) _IotMqtt_SerializeConnect
\r
250 #define _getMqttDisconnectSerializer( pSerializer ) _IotMqtt_SerializeDisconnect
\r
251 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
254 /*-----------------------------------------------------------*/
\r
257 * @brief Tracks whether @ref mqtt_function_init has been called.
\r
259 * API functions will fail if @ref mqtt_function_init was not called.
\r
261 static volatile uint32_t _initCalled = MQTT_LIBRARY_UNINITIALIZED;
\r
263 /*-----------------------------------------------------------*/
\r
265 static bool _checkInit( void )
\r
267 bool status = true;
\r
269 if( _initCalled == MQTT_LIBRARY_UNINITIALIZED )
\r
271 IotLogError( "IotMqtt_Init was not called." );
\r
283 /*-----------------------------------------------------------*/
\r
285 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,
\r
288 /* Because this function is called from a container function, the given link
\r
289 * must never be NULL. */
\r
290 IotMqtt_Assert( pSubscriptionLink != NULL );
\r
292 _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
\r
296 /* Silence warnings about unused parameters. */
\r
299 /* Set the unsubscribed flag. */
\r
300 pSubscription->unsubscribed = true;
\r
305 /*-----------------------------------------------------------*/
\r
307 static void _mqttSubscription_tryDestroy( void * pData )
\r
309 _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;
\r
311 /* Reference count must not be negative. */
\r
312 IotMqtt_Assert( pSubscription->references >= 0 );
\r
314 /* Unsubscribed flag should be set. */
\r
315 IotMqtt_Assert( pSubscription->unsubscribed == true );
\r
317 /* Free the subscription if it has no references. */
\r
318 if( pSubscription->references == 0 )
\r
320 IotMqtt_FreeSubscription( pSubscription );
\r
328 /*-----------------------------------------------------------*/
\r
330 static void _mqttOperation_tryDestroy( void * pData )
\r
332 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;
\r
333 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
335 /* Incoming PUBLISH operations may always be freed. */
\r
336 if( pOperation->incomingPublish == true )
\r
338 /* Cancel the incoming PUBLISH operation's job. */
\r
339 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
343 /* If the operation's job was not canceled, it must be already executing.
\r
344 * Any other return value is invalid. */
\r
345 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
346 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
348 /* Check if the incoming PUBLISH job was canceled. */
\r
349 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
351 /* Job was canceled. Process incoming PUBLISH now to clean up. */
\r
352 _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,
\r
358 /* The executing job will process the PUBLISH, so nothing is done here. */
\r
364 /* Decrement reference count and destroy operation if possible. */
\r
365 if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )
\r
367 _IotMqtt_DestroyOperation( pOperation );
\r
376 /*-----------------------------------------------------------*/
\r
378 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
379 uint16_t keepAliveSeconds,
\r
380 _mqttConnection_t * pMqttConnection )
\r
382 bool status = true;
\r
383 IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
\r
384 IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;
\r
386 /* Network information is not used when MQTT packet serializers are disabled. */
\r
387 ( void ) pNetworkInfo;
\r
389 /* Set PINGREQ operation members. */
\r
390 pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;
\r
392 /* Convert the keep-alive interval to milliseconds. */
\r
393 pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;
\r
394 pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;
\r
396 /* Generate a PINGREQ packet. */
\r
397 serializeStatus = _getMqttPingreqSerializer( pMqttConnection->pSerializer )( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),
\r
398 &( pMqttConnection->pingreq.u.operation.packetSize ) );
\r
400 if( serializeStatus != IOT_MQTT_SUCCESS )
\r
402 IotLogError( "Failed to allocate PINGREQ packet for new connection." );
\r
408 /* Create the task pool job that processes keep-alive. */
\r
409 jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
\r
411 &( pMqttConnection->pingreq.jobStorage ),
\r
412 &( pMqttConnection->pingreq.job ) );
\r
414 /* Task pool job creation for a pre-allocated job should never fail.
\r
415 * Abort the program if it does. */
\r
416 if( jobStatus != IOT_TASKPOOL_SUCCESS )
\r
418 IotLogError( "Failed to create keep-alive job for new connection." );
\r
420 IotMqtt_Assert( false );
\r
427 /* Keep-alive references its MQTT connection, so increment reference. */
\r
428 ( pMqttConnection->references )++;
\r
434 /*-----------------------------------------------------------*/
\r
436 static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
437 IotNetworkConnection_t * pNetworkConnection,
\r
438 bool * pCreatedNewNetworkConnection )
\r
440 IOT_FUNCTION_ENTRY( IotNetworkError_t, IOT_NETWORK_SUCCESS );
\r
442 /* Network info must not be NULL. */
\r
443 if( pNetworkInfo == NULL )
\r
445 IotLogError( "Network information cannot be NULL." );
\r
447 IOT_SET_AND_GOTO_CLEANUP( IOT_NETWORK_BAD_PARAMETER );
\r
454 /* Create a new network connection if requested. Otherwise, copy the existing
\r
455 * network connection. */
\r
456 if( pNetworkInfo->createNetworkConnection == true )
\r
458 status = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,
\r
459 pNetworkInfo->u.setup.pNetworkCredentialInfo,
\r
460 pNetworkConnection );
\r
462 if( status == IOT_NETWORK_SUCCESS )
\r
464 /* This MQTT connection owns the network connection it created and
\r
465 * should destroy it on cleanup. */
\r
466 *pCreatedNewNetworkConnection = true;
\r
470 IotLogError( "Failed to create network connection: %d", status );
\r
472 IOT_GOTO_CLEANUP();
\r
477 /* A connection already exists; the caller should not destroy
\r
478 * it on cleanup. */
\r
479 *pNetworkConnection = pNetworkInfo->u.pNetworkConnection;
\r
480 *pCreatedNewNetworkConnection = false;
\r
483 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
486 /*-----------------------------------------------------------*/
\r
488 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
\r
489 const IotMqttNetworkInfo_t * pNetworkInfo,
\r
490 uint16_t keepAliveSeconds )
\r
492 IOT_FUNCTION_ENTRY( bool, true );
\r
493 _mqttConnection_t * pMqttConnection = NULL;
\r
494 bool referencesMutexCreated = false, subscriptionMutexCreated = false;
\r
496 /* Allocate memory for the new MQTT connection. */
\r
497 pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );
\r
499 if( pMqttConnection == NULL )
\r
501 IotLogError( "Failed to allocate memory for new connection." );
\r
503 IOT_SET_AND_GOTO_CLEANUP( false );
\r
507 /* Clear the MQTT connection, then copy the MQTT server mode, network
\r
508 * interface, and disconnect callback. */
\r
509 ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );
\r
510 pMqttConnection->awsIotMqttMode = awsIotMqttMode;
\r
511 pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;
\r
512 pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;
\r
514 /* Start a new MQTT connection with a reference count of 1. */
\r
515 pMqttConnection->references = 1;
\r
518 /* Create the references mutex for a new connection. It is a recursive mutex. */
\r
519 referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );
\r
521 if( referencesMutexCreated == false )
\r
523 IotLogError( "Failed to create references mutex for new connection." );
\r
525 IOT_SET_AND_GOTO_CLEANUP( false );
\r
532 /* Create the subscription mutex for a new connection. */
\r
533 subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );
\r
535 if( subscriptionMutexCreated == false )
\r
537 IotLogError( "Failed to create subscription mutex for new connection." );
\r
539 IOT_SET_AND_GOTO_CLEANUP( false );
\r
546 /* Create the new connection's subscription and operation lists. */
\r
547 IotListDouble_Create( &( pMqttConnection->subscriptionList ) );
\r
548 IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );
\r
549 IotListDouble_Create( &( pMqttConnection->pendingResponse ) );
\r
551 /* Check if keep-alive is active for this connection. */
\r
552 if( keepAliveSeconds != 0 )
\r
554 if( _createKeepAliveOperation( pNetworkInfo,
\r
556 pMqttConnection ) == false )
\r
558 IOT_SET_AND_GOTO_CLEANUP( false );
\r
570 /* Clean up mutexes and connection if this function failed. */
\r
571 IOT_FUNCTION_CLEANUP_BEGIN();
\r
573 if( status == false )
\r
575 if( subscriptionMutexCreated == true )
\r
577 IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );
\r
584 if( referencesMutexCreated == true )
\r
586 IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );
\r
593 if( pMqttConnection != NULL )
\r
595 IotMqtt_FreeConnection( pMqttConnection );
\r
596 pMqttConnection = NULL;
\r
608 return pMqttConnection;
\r
611 /*-----------------------------------------------------------*/
\r
613 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
\r
615 IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
\r
617 /* Clean up keep-alive if still allocated. */
\r
618 if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
\r
620 IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );
\r
622 _getMqttFreePacketFunc( pMqttConnection->pSerializer )( pMqttConnection->pingreq.u.operation.pMqttPacket );
\r
624 /* Clear data about the keep-alive. */
\r
625 pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
\r
626 pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
\r
627 pMqttConnection->pingreq.u.operation.packetSize = 0;
\r
629 /* Decrement reference count. */
\r
630 pMqttConnection->references--;
\r
637 /* A connection to be destroyed should have no keep-alive and at most 1
\r
639 IotMqtt_Assert( pMqttConnection->references <= 1 );
\r
640 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );
\r
641 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );
\r
642 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );
\r
644 /* Remove all subscriptions. */
\r
645 IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
\r
646 IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
\r
647 _mqttSubscription_setUnsubscribe,
\r
649 _mqttSubscription_tryDestroy,
\r
650 offsetof( _mqttSubscription_t, link ) );
\r
651 IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
\r
653 /* Destroy an owned network connection. */
\r
654 if( pMqttConnection->ownNetworkConnection == true )
\r
656 networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );
\r
658 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
660 IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",
\r
665 IotLogInfo( "(MQTT connection %p) Network connection destroyed.",
\r
674 /* Destroy mutexes. */
\r
675 IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );
\r
676 IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );
\r
678 IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );
\r
680 /* Free connection. */
\r
681 IotMqtt_FreeConnection( pMqttConnection );
\r
684 /*-----------------------------------------------------------*/
\r
685 static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation,
\r
686 IotMqttConnection_t mqttConnection,
\r
687 const IotMqttSubscription_t * pSubscriptionList,
\r
688 size_t subscriptionCount,
\r
690 IotMqttOperation_t * const pOperationReference )
\r
692 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
694 /* This function should only be called for subscribe or unsubscribe. */
\r
695 IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
\r
696 ( operation == IOT_MQTT_UNSUBSCRIBE ) );
\r
698 /* Check that IotMqtt_Init was called. */
\r
699 if( _checkInit() == false )
\r
701 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );
\r
708 /* Check that all elements in the subscription list are valid. */
\r
709 if( _IotMqtt_ValidateSubscriptionList( operation,
\r
710 mqttConnection->awsIotMqttMode,
\r
712 subscriptionCount ) == false )
\r
714 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
721 /* Check that a reference pointer is provided for a waitable operation. */
\r
722 if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
724 if( pOperationReference == NULL )
\r
726 IotLogError( "Reference must be provided for a waitable %s.",
\r
727 IotMqtt_OperationType( operation ) );
\r
729 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
741 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
744 /*-----------------------------------------------------------*/
\r
746 static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t operation,
\r
747 IotMqttConnection_t mqttConnection,
\r
748 IotMqttSerializeSubscribe_t serializeSubscription,
\r
749 const IotMqttSubscription_t * pSubscriptionList,
\r
750 size_t subscriptionCount,
\r
752 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
753 _mqttOperation_t ** ppSubscriptionOperation )
\r
755 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
756 _mqttOperation_t * pSubscriptionOperation = NULL;
\r
758 /* Create a subscription operation. */
\r
759 status = _IotMqtt_CreateOperation( mqttConnection,
\r
762 ppSubscriptionOperation );
\r
764 if( status != IOT_MQTT_SUCCESS )
\r
766 IOT_GOTO_CLEANUP();
\r
770 pSubscriptionOperation = ( *ppSubscriptionOperation );
\r
773 /* Check the subscription operation data and set the operation type. */
\r
774 IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
775 IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );
\r
776 pSubscriptionOperation->u.operation.type = operation;
\r
778 /* Generate a subscription packet from the subscription list. */
\r
779 status = serializeSubscription( pSubscriptionList,
\r
781 &( pSubscriptionOperation->u.operation.pMqttPacket ),
\r
782 &( pSubscriptionOperation->u.operation.packetSize ),
\r
783 &( pSubscriptionOperation->u.operation.packetIdentifier ) );
\r
785 if( status != IOT_MQTT_SUCCESS )
\r
787 IOT_GOTO_CLEANUP();
\r
794 /* Check the serialized MQTT packet. */
\r
795 IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );
\r
796 IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );
\r
798 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
801 /*-----------------------------------------------------------*/
\r
803 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
\r
804 IotMqttConnection_t mqttConnection,
\r
805 IotMqttSerializeSubscribe_t serializeSubscription,
\r
806 const IotMqttSubscription_t * pSubscriptionList,
\r
807 size_t subscriptionCount,
\r
809 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
810 IotMqttOperation_t * const pOperationReference )
\r
812 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
813 _mqttOperation_t * pSubscriptionOperation = NULL;
\r
815 /* Create and serialize the subscription operation. */
\r
816 status = _subscriptionCreateAndSerialize( operation,
\r
818 serializeSubscription,
\r
823 &pSubscriptionOperation );
\r
825 if( status != IOT_MQTT_SUCCESS )
\r
827 IOT_GOTO_CLEANUP();
\r
834 /* Add the subscription list for a SUBSCRIBE. */
\r
835 if( operation == IOT_MQTT_SUBSCRIBE )
\r
837 status = _IotMqtt_AddSubscriptions( mqttConnection,
\r
838 pSubscriptionOperation->u.operation.packetIdentifier,
\r
840 subscriptionCount );
\r
842 if( status != IOT_MQTT_SUCCESS )
\r
844 IOT_GOTO_CLEANUP();
\r
852 /* Set the reference, if provided. */
\r
853 if( pOperationReference != NULL )
\r
855 *pOperationReference = pSubscriptionOperation;
\r
862 /* Send the SUBSCRIBE packet. */
\r
863 if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND )
\r
865 _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pSubscriptionOperation->job, pSubscriptionOperation );
\r
869 status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,
\r
870 _IotMqtt_ProcessSend,
\r
873 if( status != IOT_MQTT_SUCCESS )
\r
875 IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",
\r
877 IotMqtt_OperationType( operation ) );
\r
879 if( operation == IOT_MQTT_SUBSCRIBE )
\r
881 _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,
\r
882 pSubscriptionOperation->u.operation.packetIdentifier,
\r
883 MQTT_REMOVE_ALL_SUBSCRIPTIONS );
\r
890 /* Clear the previously set (and now invalid) reference. */
\r
891 if( pOperationReference != NULL )
\r
893 *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;
\r
900 IOT_GOTO_CLEANUP();
\r
904 /* Clean up if this function failed. */
\r
905 IOT_FUNCTION_CLEANUP_BEGIN();
\r
907 if( status != IOT_MQTT_SUCCESS )
\r
909 if( pSubscriptionOperation != NULL )
\r
911 _IotMqtt_DestroyOperation( pSubscriptionOperation );
\r
920 status = IOT_MQTT_STATUS_PENDING;
\r
922 IotLogInfo( "(MQTT connection %p) %s operation scheduled.",
\r
924 IotMqtt_OperationType( operation ) );
\r
927 IOT_FUNCTION_CLEANUP_END();
\r
930 /*-----------------------------------------------------------*/
\r
932 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )
\r
934 bool disconnected = false;
\r
936 /* Lock the mutex protecting the reference count. */
\r
937 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
939 /* Reference count must not be negative. */
\r
940 IotMqtt_Assert( pMqttConnection->references >= 0 );
\r
942 /* Read connection status. */
\r
943 disconnected = pMqttConnection->disconnected;
\r
945 /* Increment the connection's reference count if it is not disconnected. */
\r
946 if( disconnected == false )
\r
948 ( pMqttConnection->references )++;
\r
949 IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",
\r
951 ( long int ) pMqttConnection->references - 1,
\r
952 ( long int ) pMqttConnection->references );
\r
956 IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );
\r
959 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
961 return( disconnected == false );
\r
964 /*-----------------------------------------------------------*/
\r
966 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )
\r
968 bool destroyConnection = false;
\r
970 /* Lock the mutex protecting the reference count. */
\r
971 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
973 /* Decrement reference count. It must not be negative. */
\r
974 ( pMqttConnection->references )--;
\r
975 IotMqtt_Assert( pMqttConnection->references >= 0 );
\r
977 IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",
\r
979 ( long int ) pMqttConnection->references + 1,
\r
980 ( long int ) pMqttConnection->references );
\r
982 /* Check if this connection may be destroyed. */
\r
983 if( pMqttConnection->references == 0 )
\r
985 destroyConnection = true;
\r
992 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
994 /* Destroy an unreferenced MQTT connection. */
\r
995 if( destroyConnection == true )
\r
997 IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",
\r
999 _destroyMqttConnection( pMqttConnection );
\r
1003 EMPTY_ELSE_MARKER;
\r
1007 /*-----------------------------------------------------------*/
\r
1009 IotMqttError_t IotMqtt_Init( void )
\r
1011 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
1012 uint32_t allowInitialization = Atomic_CompareAndSwap_u32( &_initCalled,
\r
1013 MQTT_LIBRARY_INITIALIZED,
\r
1014 MQTT_LIBRARY_UNINITIALIZED );
\r
1016 if( allowInitialization == 1 )
\r
1018 /* Call any additional serializer initialization function if serializer
\r
1019 * overrides are enabled. */
\r
1020 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1021 #ifdef _IotMqtt_InitSerializeAdditional
\r
1022 if( _IotMqtt_InitSerializeAdditional() == false )
\r
1024 /* Log initialization status. */
\r
1025 IotLogError( "Failed to initialize MQTT library serializer. " );
\r
1027 status = IOT_MQTT_INIT_FAILED;
\r
1031 EMPTY_ELSE_MARKER;
\r
1033 #endif /* ifdef _IotMqtt_InitSerializeAdditional */
\r
1034 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1036 if( status == IOT_MQTT_SUCCESS )
\r
1038 IotLogInfo( "MQTT library successfully initialized." );
\r
1042 EMPTY_ELSE_MARKER;
\r
1047 IotLogWarn( "IotMqtt_Init called with library already initialized." );
\r
1053 /*-----------------------------------------------------------*/
\r
1055 void IotMqtt_Cleanup( void )
\r
1057 uint32_t allowCleanup = Atomic_CompareAndSwap_u32( &_initCalled,
\r
1058 MQTT_LIBRARY_UNINITIALIZED,
\r
1059 MQTT_LIBRARY_INITIALIZED );
\r
1061 if( allowCleanup == 1 )
\r
1063 /* Call any additional serializer cleanup initialization function if serializer
\r
1064 * overrides are enabled. */
\r
1065 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1066 #ifdef _IotMqtt_CleanupSerializeAdditional
\r
1067 _IotMqtt_CleanupSerializeAdditional();
\r
1071 IotLogInfo( "MQTT library cleanup done." );
\r
1075 IotLogWarn( "IotMqtt_Init was not called before IotMqtt_Cleanup." );
\r
1079 /*-----------------------------------------------------------*/
\r
1081 IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
\r
1082 const IotMqttConnectInfo_t * pConnectInfo,
\r
1083 uint32_t timeoutMs,
\r
1084 IotMqttConnection_t * const pMqttConnection )
\r
1086 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
1087 bool ownNetworkConnection = false;
\r
1088 IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
\r
1089 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
1090 IotNetworkConnection_t pNetworkConnection = { 0 };
\r
1091 _mqttOperation_t * pOperation = NULL;
\r
1092 _mqttConnection_t * pNewMqttConnection = NULL;
\r
1094 /* Check that IotMqtt_Init was called. */
\r
1095 if( _checkInit() == false )
\r
1097 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );
\r
1101 EMPTY_ELSE_MARKER;
\r
1104 /* Validate network interface and connect info. */
\r
1105 if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
\r
1107 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1111 EMPTY_ELSE_MARKER;
\r
1114 networkStatus = _createNetworkConnection( pNetworkInfo,
\r
1115 &pNetworkConnection,
\r
1116 &ownNetworkConnection );
\r
1118 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1120 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
1124 EMPTY_ELSE_MARKER;
\r
1127 IotLogInfo( "Establishing new MQTT connection." );
\r
1129 /* Initialize a new MQTT connection object. */
\r
1130 pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
\r
1132 pConnectInfo->keepAliveSeconds );
\r
1134 if( pNewMqttConnection == NULL )
\r
1136 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
1140 /* Set the network connection associated with the MQTT connection. */
\r
1141 pNewMqttConnection->pNetworkConnection = pNetworkConnection;
\r
1142 pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;
\r
1144 /* Set the MQTT packet serializer overrides. */
\r
1145 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1146 pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;
\r
1148 pNewMqttConnection->pSerializer = NULL;
\r
1152 /* Set the MQTT receive callback. */
\r
1153 networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,
\r
1154 IotMqtt_ReceiveCallback,
\r
1155 pNewMqttConnection );
\r
1157 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1159 IotLogError( "Failed to set MQTT network receive callback." );
\r
1161 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
1165 EMPTY_ELSE_MARKER;
\r
1168 /* Create a CONNECT operation. */
\r
1169 status = _IotMqtt_CreateOperation( pNewMqttConnection,
\r
1170 IOT_MQTT_FLAG_WAITABLE,
\r
1174 if( status != IOT_MQTT_SUCCESS )
\r
1176 IOT_GOTO_CLEANUP();
\r
1180 EMPTY_ELSE_MARKER;
\r
1183 /* Ensure the members set by operation creation and serialization
\r
1184 * are appropriate for a blocking CONNECT. */
\r
1185 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1186 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
\r
1187 == IOT_MQTT_FLAG_WAITABLE );
\r
1188 IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
\r
1190 /* Set the operation type. */
\r
1191 pOperation->u.operation.type = IOT_MQTT_CONNECT;
\r
1193 /* Add previous session subscriptions. */
\r
1194 if( pConnectInfo->pPreviousSubscriptions != NULL )
\r
1196 /* Previous subscription count should have been validated as nonzero. */
\r
1197 IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );
\r
1199 status = _IotMqtt_AddSubscriptions( pNewMqttConnection,
\r
1201 pConnectInfo->pPreviousSubscriptions,
\r
1202 pConnectInfo->previousSubscriptionCount );
\r
1204 if( status != IOT_MQTT_SUCCESS )
\r
1206 IOT_GOTO_CLEANUP();
\r
1210 EMPTY_ELSE_MARKER;
\r
1215 EMPTY_ELSE_MARKER;
\r
1218 /* Convert the connect info and will info objects to an MQTT CONNECT packet. */
\r
1219 status = _getMqttConnectSerializer( pNetworkInfo->pMqttSerializer )( pConnectInfo,
\r
1220 &( pOperation->u.operation.pMqttPacket ),
\r
1221 &( pOperation->u.operation.packetSize ) );
\r
1223 if( status != IOT_MQTT_SUCCESS )
\r
1225 IOT_GOTO_CLEANUP();
\r
1229 EMPTY_ELSE_MARKER;
\r
1232 /* Check the serialized MQTT packet. */
\r
1233 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1234 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1236 /* Send the CONNECT packet. */
\r
1237 _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );
\r
1239 /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */
\r
1240 status = IotMqtt_Wait( pOperation, timeoutMs );
\r
1242 /* The call to wait cleans up the CONNECT operation, so set the pointer
\r
1244 pOperation = NULL;
\r
1246 /* When a connection is successfully established, schedule keep-alive job. */
\r
1247 if( status == IOT_MQTT_SUCCESS )
\r
1249 /* Check if a keep-alive job should be scheduled. */
\r
1250 if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
\r
1252 IotLogDebug( "Scheduling first MQTT keep-alive job." );
\r
1254 taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
\r
1255 pNewMqttConnection->pingreq.job,
\r
1256 pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );
\r
1258 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1260 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );
\r
1264 EMPTY_ELSE_MARKER;
\r
1269 EMPTY_ELSE_MARKER;
\r
1274 EMPTY_ELSE_MARKER;
\r
1277 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1279 if( status != IOT_MQTT_SUCCESS )
\r
1281 IotLogError( "Failed to establish new MQTT connection, error %s.",
\r
1282 IotMqtt_strerror( status ) );
\r
1284 /* The network connection must be closed if it was created. */
\r
1285 if( ownNetworkConnection == true )
\r
1287 networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );
\r
1289 if( networkStatus != IOT_NETWORK_SUCCESS )
\r
1291 IotLogWarn( "Failed to close network connection." );
\r
1295 IotLogInfo( "Network connection closed on error." );
\r
1300 EMPTY_ELSE_MARKER;
\r
1303 if( pOperation != NULL )
\r
1305 _IotMqtt_DestroyOperation( pOperation );
\r
1309 EMPTY_ELSE_MARKER;
\r
1312 if( pNewMqttConnection != NULL )
\r
1314 _destroyMqttConnection( pNewMqttConnection );
\r
1318 EMPTY_ELSE_MARKER;
\r
1323 IotLogInfo( "New MQTT connection %p established.", pMqttConnection );
\r
1325 /* Set the output parameter. */
\r
1326 *pMqttConnection = pNewMqttConnection;
\r
1329 IOT_FUNCTION_CLEANUP_END();
\r
1332 /*-----------------------------------------------------------*/
\r
1334 void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
\r
1337 bool disconnected = false, initCalled = false;
\r
1338 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1339 _mqttOperation_t * pOperation = NULL;
\r
1341 /* Check that IotMqtt_Init was called. */
\r
1342 initCalled = _checkInit();
\r
1344 if( initCalled == false )
\r
1346 IOT_GOTO_CLEANUP();
\r
1350 EMPTY_ELSE_MARKER;
\r
1353 /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"
\r
1354 * flag is not set. */
\r
1355 if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == IOT_MQTT_FLAG_CLEANUP_ONLY )
\r
1357 IOT_GOTO_CLEANUP();
\r
1360 /* Read the connection status. */
\r
1361 IotMutex_Lock( &( mqttConnection->referencesMutex ) );
\r
1362 disconnected = mqttConnection->disconnected;
\r
1363 IotMutex_Unlock( &( mqttConnection->referencesMutex ) );
\r
1365 if( disconnected == true )
\r
1367 IOT_GOTO_CLEANUP();
\r
1370 IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );
\r
1372 /* Create a DISCONNECT operation. This function blocks until the DISCONNECT
\r
1373 * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */
\r
1374 status = _IotMqtt_CreateOperation( mqttConnection,
\r
1375 IOT_MQTT_FLAG_WAITABLE,
\r
1379 if( status == IOT_MQTT_SUCCESS )
\r
1381 /* Ensure that the members set by operation creation and serialization
\r
1382 * are appropriate for a blocking DISCONNECT. */
\r
1383 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1384 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
\r
1385 == IOT_MQTT_FLAG_WAITABLE );
\r
1386 IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
\r
1388 /* Set the operation type. */
\r
1389 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;
\r
1391 /* Generate a DISCONNECT packet. */
\r
1392 status = _getMqttDisconnectSerializer( mqttConnection->pSerializer )( &( pOperation->u.operation.pMqttPacket ),
\r
1393 &( pOperation->u.operation.packetSize ) );
\r
1397 EMPTY_ELSE_MARKER;
\r
1400 if( status == IOT_MQTT_SUCCESS )
\r
1402 /* Check the serialized MQTT packet. */
\r
1403 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1404 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1406 /* Send the DISCONNECT packet. */
\r
1407 _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );
\r
1409 /* Wait a short time for the DISCONNECT packet to be transmitted. */
\r
1410 status = IotMqtt_Wait( pOperation,
\r
1411 IOT_MQTT_RESPONSE_WAIT_MS );
\r
1413 /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,
\r
1414 * or NETWORK ERROR. */
\r
1415 if( status == IOT_MQTT_SUCCESS )
\r
1417 IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );
\r
1421 IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||
\r
1422 ( status == IOT_MQTT_NETWORK_ERROR ) );
\r
1424 IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",
\r
1426 IotMqtt_strerror( status ) );
\r
1431 EMPTY_ELSE_MARKER;
\r
1434 /* This function has no return value and no cleanup, but uses the cleanup
\r
1435 * label to exit on error. */
\r
1436 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1438 if( initCalled == true )
\r
1440 /* Close the underlying network connection. This also cleans up keep-alive. */
\r
1441 _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,
\r
1444 /* Check if the connection may be destroyed. */
\r
1445 IotMutex_Lock( &( mqttConnection->referencesMutex ) );
\r
1447 /* At this point, the connection should be marked disconnected. */
\r
1448 IotMqtt_Assert( mqttConnection->disconnected == true );
\r
1450 /* Attempt cancel and destroy each operation in the connection's lists. */
\r
1451 IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),
\r
1452 _mqttOperation_tryDestroy,
\r
1453 offsetof( _mqttOperation_t, link ) );
\r
1455 IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),
\r
1456 _mqttOperation_tryDestroy,
\r
1457 offsetof( _mqttOperation_t, link ) );
\r
1459 IotMutex_Unlock( &( mqttConnection->referencesMutex ) );
\r
1461 /* Decrement the connection reference count and destroy it if possible. */
\r
1462 _IotMqtt_DecrementConnectionReferences( mqttConnection );
\r
1466 /*-----------------------------------------------------------*/
\r
1468 IotMqttError_t IotMqtt_SubscribeAsync( IotMqttConnection_t mqttConnection,
\r
1469 const IotMqttSubscription_t * pSubscriptionList,
\r
1470 size_t subscriptionCount,
\r
1472 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1473 IotMqttOperation_t * const pSubscribeOperation )
\r
1475 IotMqttError_t status = _subscriptionCommonSetup( IOT_MQTT_SUBSCRIBE,
\r
1477 pSubscriptionList,
\r
1478 subscriptionCount,
\r
1480 pSubscribeOperation );
\r
1482 if( IOT_MQTT_SUCCESS == status )
\r
1484 status = _subscriptionCommon( IOT_MQTT_SUBSCRIBE,
\r
1486 _getMqttSubscribeSerializer( mqttConnection->pSerializer ),
\r
1487 pSubscriptionList,
\r
1488 subscriptionCount,
\r
1491 pSubscribeOperation );
\r
1495 EMPTY_ELSE_MARKER;
\r
1501 /*-----------------------------------------------------------*/
\r
1503 IotMqttError_t IotMqtt_SubscribeSync( IotMqttConnection_t mqttConnection,
\r
1504 const IotMqttSubscription_t * pSubscriptionList,
\r
1505 size_t subscriptionCount,
\r
1507 uint32_t timeoutMs )
\r
1509 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1510 IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1512 /* Flags are not used, but the parameter is present for future compatibility. */
\r
1515 /* Call the asynchronous SUBSCRIBE function. */
\r
1516 status = IotMqtt_SubscribeAsync( mqttConnection,
\r
1517 pSubscriptionList,
\r
1518 subscriptionCount,
\r
1519 IOT_MQTT_FLAG_WAITABLE | MQTT_INTERNAL_FLAG_BLOCK_ON_SEND,
\r
1521 &subscribeOperation );
\r
1523 /* Wait for the SUBSCRIBE operation to complete. */
\r
1524 if( status == IOT_MQTT_STATUS_PENDING )
\r
1526 status = IotMqtt_Wait( subscribeOperation, timeoutMs );
\r
1530 EMPTY_ELSE_MARKER;
\r
1533 /* Ensure that a status was set. */
\r
1534 IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );
\r
1539 /*-----------------------------------------------------------*/
\r
1541 IotMqttError_t IotMqtt_UnsubscribeAsync( IotMqttConnection_t mqttConnection,
\r
1542 const IotMqttSubscription_t * pSubscriptionList,
\r
1543 size_t subscriptionCount,
\r
1545 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1546 IotMqttOperation_t * const pUnsubscribeOperation )
\r
1548 IotMqttError_t status = _subscriptionCommonSetup( IOT_MQTT_UNSUBSCRIBE,
\r
1550 pSubscriptionList,
\r
1551 subscriptionCount,
\r
1553 pUnsubscribeOperation );
\r
1555 if( IOT_MQTT_SUCCESS == status )
\r
1557 /* Remove the MQTT subscription list for an UNSUBSCRIBE. */
\r
1558 _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,
\r
1559 pSubscriptionList,
\r
1560 subscriptionCount );
\r
1562 status = _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,
\r
1564 _getMqttUnsubscribeSerializer( mqttConnection->pSerializer ),
\r
1565 pSubscriptionList,
\r
1566 subscriptionCount,
\r
1569 pUnsubscribeOperation );
\r
1573 EMPTY_ELSE_MARKER;
\r
1579 /*-----------------------------------------------------------*/
\r
1581 IotMqttError_t IotMqtt_UnsubscribeSync( IotMqttConnection_t mqttConnection,
\r
1582 const IotMqttSubscription_t * pSubscriptionList,
\r
1583 size_t subscriptionCount,
\r
1585 uint32_t timeoutMs )
\r
1587 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1588 IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1590 /* Flags are not used, but the parameter is present for future compatibility. */
\r
1593 /* Call the asynchronous UNSUBSCRIBE function. */
\r
1594 status = IotMqtt_UnsubscribeAsync( mqttConnection,
\r
1595 pSubscriptionList,
\r
1596 subscriptionCount,
\r
1597 IOT_MQTT_FLAG_WAITABLE | MQTT_INTERNAL_FLAG_BLOCK_ON_SEND,
\r
1599 &unsubscribeOperation );
\r
1601 /* Wait for the UNSUBSCRIBE operation to complete. */
\r
1602 if( status == IOT_MQTT_STATUS_PENDING )
\r
1604 status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );
\r
1608 EMPTY_ELSE_MARKER;
\r
1611 /* Ensure that a status was set. */
\r
1612 IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );
\r
1617 /*-----------------------------------------------------------*/
\r
1619 IotMqttError_t IotMqtt_PublishAsync( IotMqttConnection_t mqttConnection,
\r
1620 const IotMqttPublishInfo_t * pPublishInfo,
\r
1622 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
1623 IotMqttOperation_t * const pPublishOperation )
\r
1625 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
1626 _mqttOperation_t * pOperation = NULL;
\r
1627 uint8_t ** pPacketIdentifierHigh = NULL;
\r
1629 /* Check that IotMqtt_Init was called. */
\r
1630 if( _checkInit() == false )
\r
1632 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );
\r
1636 EMPTY_ELSE_MARKER;
\r
1639 /* Check that the PUBLISH information is valid. */
\r
1640 if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,
\r
1641 pPublishInfo ) == false )
\r
1643 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1647 EMPTY_ELSE_MARKER;
\r
1650 /* Check that no notification is requested for a QoS 0 publish. */
\r
1651 if( pPublishInfo->qos == IOT_MQTT_QOS_0 )
\r
1653 if( pCallbackInfo != NULL )
\r
1655 IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
\r
1657 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1659 else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )
\r
1661 IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
\r
1663 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1667 EMPTY_ELSE_MARKER;
\r
1670 if( pPublishOperation != NULL )
\r
1672 IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );
\r
1676 EMPTY_ELSE_MARKER;
\r
1681 EMPTY_ELSE_MARKER;
\r
1684 /* Check that a reference pointer is provided for a waitable operation. */
\r
1685 if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
1687 if( pPublishOperation == NULL )
\r
1689 IotLogError( "Reference must be provided for a waitable PUBLISH." );
\r
1691 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
1695 EMPTY_ELSE_MARKER;
\r
1700 EMPTY_ELSE_MARKER;
\r
1703 /* Create a PUBLISH operation. */
\r
1704 status = _IotMqtt_CreateOperation( mqttConnection,
\r
1709 if( status != IOT_MQTT_SUCCESS )
\r
1711 IOT_GOTO_CLEANUP();
\r
1715 EMPTY_ELSE_MARKER;
\r
1718 /* Check the PUBLISH operation data and set the operation type. */
\r
1719 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
1720 pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;
\r
1722 /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */
\r
1723 if( mqttConnection->awsIotMqttMode == true )
\r
1725 pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );
\r
1729 EMPTY_ELSE_MARKER;
\r
1732 /* Generate a PUBLISH packet from pPublishInfo. */
\r
1733 status = _getMqttPublishSerializer( mqttConnection->pSerializer )( pPublishInfo,
\r
1734 &( pOperation->u.operation.pMqttPacket ),
\r
1735 &( pOperation->u.operation.packetSize ),
\r
1736 &( pOperation->u.operation.packetIdentifier ),
\r
1737 pPacketIdentifierHigh );
\r
1739 if( status != IOT_MQTT_SUCCESS )
\r
1741 IOT_GOTO_CLEANUP();
\r
1745 EMPTY_ELSE_MARKER;
\r
1748 /* Check the serialized MQTT packet. */
\r
1749 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
1750 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
\r
1752 /* Initialize PUBLISH retry if retryLimit is set. */
\r
1753 if( pPublishInfo->retryLimit > 0 )
\r
1755 /* A QoS 0 PUBLISH may not be retried. */
\r
1756 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1758 pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;
\r
1759 pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;
\r
1763 EMPTY_ELSE_MARKER;
\r
1768 EMPTY_ELSE_MARKER;
\r
1771 /* Set the reference, if provided. */
\r
1772 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1774 if( pPublishOperation != NULL )
\r
1776 *pPublishOperation = pOperation;
\r
1780 EMPTY_ELSE_MARKER;
\r
1785 EMPTY_ELSE_MARKER;
\r
1788 /* Send the PUBLISH packet. */
\r
1789 if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND )
\r
1791 _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );
\r
1795 status = _IotMqtt_ScheduleOperation( pOperation,
\r
1796 _IotMqtt_ProcessSend,
\r
1799 if( status != IOT_MQTT_SUCCESS )
\r
1801 IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",
\r
1804 /* Clear the previously set (and now invalid) reference. */
\r
1805 if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
\r
1807 if( pPublishOperation != NULL )
\r
1809 *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;
\r
1813 EMPTY_ELSE_MARKER;
\r
1818 EMPTY_ELSE_MARKER;
\r
1821 IOT_GOTO_CLEANUP();
\r
1825 EMPTY_ELSE_MARKER;
\r
1829 /* Clean up the PUBLISH operation if this function fails. Otherwise, set the
\r
1830 * appropriate return code based on QoS. */
\r
1831 IOT_FUNCTION_CLEANUP_BEGIN();
\r
1833 if( status != IOT_MQTT_SUCCESS )
\r
1835 if( pOperation != NULL )
\r
1837 _IotMqtt_DestroyOperation( pOperation );
\r
1841 EMPTY_ELSE_MARKER;
\r
1846 if( pPublishInfo->qos > IOT_MQTT_QOS_0 )
\r
1848 status = IOT_MQTT_STATUS_PENDING;
\r
1852 EMPTY_ELSE_MARKER;
\r
1855 IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",
\r
1859 IOT_FUNCTION_CLEANUP_END();
\r
1862 /*-----------------------------------------------------------*/
\r
1864 IotMqttError_t IotMqtt_PublishSync( IotMqttConnection_t mqttConnection,
\r
1865 const IotMqttPublishInfo_t * pPublishInfo,
\r
1867 uint32_t timeoutMs )
\r
1869 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
1870 IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,
\r
1871 * pPublishOperation = NULL;
\r
1873 /* Clear the flags, setting only the "serial" flag. */
\r
1874 flags = MQTT_INTERNAL_FLAG_BLOCK_ON_SEND;
\r
1876 /* Set the waitable flag and reference for QoS 1 PUBLISH. */
\r
1877 if( pPublishInfo->qos == IOT_MQTT_QOS_1 )
\r
1879 flags |= IOT_MQTT_FLAG_WAITABLE;
\r
1880 pPublishOperation = &publishOperation;
\r
1884 EMPTY_ELSE_MARKER;
\r
1887 /* Call the asynchronous PUBLISH function. */
\r
1888 status = IotMqtt_PublishAsync( mqttConnection,
\r
1892 pPublishOperation );
\r
1894 /* Wait for a queued QoS 1 PUBLISH to complete. */
\r
1895 if( pPublishInfo->qos == IOT_MQTT_QOS_1 )
\r
1897 if( status == IOT_MQTT_STATUS_PENDING )
\r
1899 status = IotMqtt_Wait( publishOperation, timeoutMs );
\r
1903 EMPTY_ELSE_MARKER;
\r
1908 EMPTY_ELSE_MARKER;
\r
1914 /*-----------------------------------------------------------*/
\r
1916 IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,
\r
1917 uint32_t timeoutMs )
\r
1919 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
1920 _mqttConnection_t * pMqttConnection = NULL;
\r
1922 /* Check that IotMqtt_Init was called. */
\r
1923 if( _checkInit() == false )
\r
1925 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );
\r
1929 EMPTY_ELSE_MARKER;
\r
1932 /* Validate the given operation reference. */
\r
1933 if( _IotMqtt_ValidateOperation( operation ) == false )
\r
1935 status = IOT_MQTT_BAD_PARAMETER;
\r
1939 EMPTY_ELSE_MARKER;
\r
1942 /* Check the MQTT connection status. */
\r
1943 pMqttConnection = operation->pMqttConnection;
\r
1945 if( status == IOT_MQTT_SUCCESS )
\r
1947 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1949 if( pMqttConnection->disconnected == true )
\r
1951 IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "
\r
1952 "Operation cannot be waited on.",
\r
1954 IotMqtt_OperationType( operation->u.operation.type ),
\r
1957 status = IOT_MQTT_NETWORK_ERROR;
\r
1961 IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",
\r
1963 IotMqtt_OperationType( operation->u.operation.type ),
\r
1967 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1969 /* Only wait on an operation if the MQTT connection is active. */
\r
1970 if( status == IOT_MQTT_SUCCESS )
\r
1972 if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),
\r
1973 timeoutMs ) == false )
\r
1975 status = IOT_MQTT_TIMEOUT;
\r
1977 /* Attempt to cancel the job of the timed out operation. */
\r
1978 ( void ) _IotMqtt_DecrementOperationReferences( operation, true );
\r
1980 /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */
\r
1981 if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )
\r
1983 IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"
\r
1984 " subscriptions of timed-out SUBSCRIBE.",
\r
1988 _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,
\r
1989 operation->u.operation.packetIdentifier,
\r
1990 MQTT_REMOVE_ALL_SUBSCRIPTIONS );
\r
1994 EMPTY_ELSE_MARKER;
\r
1999 /* Retrieve the status of the completed operation. */
\r
2000 status = operation->u.operation.status;
\r
2003 IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",
\r
2005 IotMqtt_OperationType( operation->u.operation.type ),
\r
2007 IotMqtt_strerror( status ) );
\r
2011 EMPTY_ELSE_MARKER;
\r
2014 /* Wait is finished; decrement operation reference count. */
\r
2015 if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )
\r
2017 _IotMqtt_DestroyOperation( operation );
\r
2021 EMPTY_ELSE_MARKER;
\r
2026 EMPTY_ELSE_MARKER;
\r
2029 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
2032 /*-----------------------------------------------------------*/
\r
2034 const char * IotMqtt_strerror( IotMqttError_t status )
\r
2036 const char * pMessage = NULL;
\r
2040 case IOT_MQTT_SUCCESS:
\r
2041 pMessage = "SUCCESS";
\r
2044 case IOT_MQTT_STATUS_PENDING:
\r
2045 pMessage = "PENDING";
\r
2048 case IOT_MQTT_INIT_FAILED:
\r
2049 pMessage = "INITIALIZATION FAILED";
\r
2052 case IOT_MQTT_BAD_PARAMETER:
\r
2053 pMessage = "BAD PARAMETER";
\r
2056 case IOT_MQTT_NO_MEMORY:
\r
2057 pMessage = "NO MEMORY";
\r
2060 case IOT_MQTT_NETWORK_ERROR:
\r
2061 pMessage = "NETWORK ERROR";
\r
2064 case IOT_MQTT_SCHEDULING_ERROR:
\r
2065 pMessage = "SCHEDULING ERROR";
\r
2068 case IOT_MQTT_BAD_RESPONSE:
\r
2069 pMessage = "BAD RESPONSE RECEIVED";
\r
2072 case IOT_MQTT_TIMEOUT:
\r
2073 pMessage = "TIMEOUT";
\r
2076 case IOT_MQTT_SERVER_REFUSED:
\r
2077 pMessage = "SERVER REFUSED";
\r
2080 case IOT_MQTT_RETRY_NO_RESPONSE:
\r
2081 pMessage = "NO RESPONSE";
\r
2084 case IOT_MQTT_NOT_INITIALIZED:
\r
2085 pMessage = "NOT INITIALIZED";
\r
2089 pMessage = "INVALID STATUS";
\r
2096 /*-----------------------------------------------------------*/
\r
2098 const char * IotMqtt_OperationType( IotMqttOperationType_t operation )
\r
2100 const char * pMessage = NULL;
\r
2102 switch( operation )
\r
2104 case IOT_MQTT_CONNECT:
\r
2105 pMessage = "CONNECT";
\r
2108 case IOT_MQTT_PUBLISH_TO_SERVER:
\r
2109 pMessage = "PUBLISH";
\r
2112 case IOT_MQTT_PUBACK:
\r
2113 pMessage = "PUBACK";
\r
2116 case IOT_MQTT_SUBSCRIBE:
\r
2117 pMessage = "SUBSCRIBE";
\r
2120 case IOT_MQTT_UNSUBSCRIBE:
\r
2121 pMessage = "UNSUBSCRIBE";
\r
2124 case IOT_MQTT_PINGREQ:
\r
2125 pMessage = "PINGREQ";
\r
2128 case IOT_MQTT_DISCONNECT:
\r
2129 pMessage = "DISCONNECT";
\r
2133 pMessage = "INVALID OPERATION";
\r
2140 /*-----------------------------------------------------------*/
\r
2142 /* Provide access to internal functions and variables if testing. */
\r
2143 #if IOT_BUILD_TESTS == 1
\r
2144 #include "iot_test_access_mqtt_api.c"
\r