]> git.sur5r.net Git - freertos/blob - FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
Add the Labs projects provided in the V10.2.1_191129 zip file.
[freertos] / FreeRTOS-Labs / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_api.c
1 /*\r
2  * IoT MQTT V2.1.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  */\r
22 \r
23 /**\r
24  * @file iot_mqtt_api.c\r
25  * @brief Implements most user-facing functions of the MQTT library.\r
26  */\r
27 \r
28 /* The config header is always included first. */\r
29 #include "iot_config.h"\r
30 \r
31 /* Standard includes. */\r
32 #include <string.h>\r
33 \r
34 /* Error handling include. */\r
35 #include "iot_error.h"\r
36 \r
37 /* MQTT internal include. */\r
38 #include "private/iot_mqtt_internal.h"\r
39 \r
40 /* Platform layer includes. */\r
41 #include "platform/iot_clock.h"\r
42 #include "platform/iot_threads.h"\r
43 \r
44 /* Atomics include. */\r
45 #include "iot_atomic.h"\r
46 \r
47 /* Validate MQTT configuration settings. */\r
48 #if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1\r
49     #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."\r
50 #endif\r
51 #if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1\r
52     #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."\r
53 #endif\r
54 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1\r
55     #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."\r
56 #endif\r
57 #if IOT_MQTT_RESPONSE_WAIT_MS <= 0\r
58     #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."\r
59 #endif\r
60 #if IOT_MQTT_RETRY_MS_CEILING <= 0\r
61     #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."\r
62 #endif\r
63 \r
64 /*-----------------------------------------------------------*/\r
65 \r
66 /**\r
67  * @brief Uninitialized value for @ref _initCalled.\r
68  */\r
69 #define MQTT_LIBRARY_UNINITIALIZED    ( ( uint32_t ) 0 )\r
70 \r
71 /**\r
72  * @brief Initialized value for @ref _initCalled.\r
73  */\r
74 #define MQTT_LIBRARY_INITIALIZED      ( ( uint32_t ) 1 )\r
75 \r
76 /*-----------------------------------------------------------*/\r
77 \r
78 /**\r
79  * @brief Check if the library is initialized.\r
80  *\r
81  * @return `true` if IotMqtt_Init was called; `false` otherwise.\r
82  */\r
83 static bool _checkInit( void );\r
84 \r
85 /**\r
86  * @brief Set the unsubscribed flag of an MQTT subscription.\r
87  *\r
88  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
89  * @param[in] pMatch Not used.\r
90  *\r
91  * @return Always returns `true`.\r
92  */\r
93 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
94                                               void * pMatch );\r
95 \r
96 /**\r
97  * @brief Destroy an MQTT subscription if its reference count is 0.\r
98  *\r
99  * @param[in] pData The subscription to destroy. This parameter is of type\r
100  * `void*` for compatibility with [free]\r
101  * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
102  */\r
103 static void _mqttSubscription_tryDestroy( void * pData );\r
104 \r
105 /**\r
106  * @brief Decrement the reference count of an MQTT operation and attempt to\r
107  * destroy it.\r
108  *\r
109  * @param[in] pData The operation data to destroy. This parameter is of type\r
110  * `void*` for compatibility with [free]\r
111  * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
112  */\r
113 static void _mqttOperation_tryDestroy( void * pData );\r
114 \r
115 /**\r
116  * @brief Initialize the keep-alive operation for an MQTT connection.\r
117  *\r
118  * @param[in] pNetworkInfo User-provided network information for the new\r
119  * connection.\r
120  * @param[in] keepAliveSeconds User-provided keep-alive interval.\r
121  * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.\r
122  *\r
123  * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
124  */\r
125 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
126                                        uint16_t keepAliveSeconds,\r
127                                        _mqttConnection_t * pMqttConnection );\r
128 \r
129 /**\r
130  * @brief Initialize a network connection, creating it if necessary.\r
131  *\r
132  * @param[in] pNetworkInfo User-provided network information for the connection\r
133  * connection.\r
134  * @param[out] pNetworkConnection On success, the created and/or initialized network connection.\r
135  * @param[out] pCreatedNewNetworkConnection On success, `true` if a new network connection was created; `false` if an existing one will be used.\r
136  *\r
137  * @return Any #IotNetworkError_t, as defined by the network stack.\r
138  */\r
139 static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * pNetworkInfo,\r
140                                                    IotNetworkConnection_t * pNetworkConnection,\r
141                                                    bool * pCreatedNewNetworkConnection );\r
142 \r
143 /**\r
144  * @brief Creates a new MQTT connection and initializes its members.\r
145  *\r
146  * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.\r
147  * @param[in] pNetworkInfo User-provided network information for the new\r
148  * connection.\r
149  * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.\r
150  *\r
151  * @return Pointer to a newly-created MQTT connection; `NULL` on failure.\r
152  */\r
153 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
154                                                   const IotMqttNetworkInfo_t * pNetworkInfo,\r
155                                                   uint16_t keepAliveSeconds );\r
156 \r
157 /**\r
158  * @brief Destroys the members of an MQTT connection.\r
159  *\r
160  * @param[in] pMqttConnection Which connection to destroy.\r
161  */\r
162 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );\r
163 \r
164 /**\r
165  * @brief Common setup function for subscribe and unsubscribe operations.\r
166  *\r
167  * See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a\r
168  * description of the parameters and return values.\r
169  */\r
170 static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation,\r
171                                                 IotMqttConnection_t mqttConnection,\r
172                                                 const IotMqttSubscription_t * pSubscriptionList,\r
173                                                 size_t subscriptionCount,\r
174                                                 uint32_t flags,\r
175                                                 IotMqttOperation_t * const pOperationReference );\r
176 \r
177 /**\r
178  * @brief Utility function for creating and serializing subscription requests\r
179  *\r
180  * See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a\r
181  * description of the parameters and return values.\r
182  */\r
183 static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t operation,\r
184                                                        IotMqttConnection_t mqttConnection,\r
185                                                        IotMqttSerializeSubscribe_t serializeSubscription,\r
186                                                        const IotMqttSubscription_t * pSubscriptionList,\r
187                                                        size_t subscriptionCount,\r
188                                                        uint32_t flags,\r
189                                                        const IotMqttCallbackInfo_t * pCallbackInfo,\r
190                                                        _mqttOperation_t ** ppSubscriptionOperation );\r
191 \r
192 /**\r
193  * @brief The common component of both @ref mqtt_function_subscribeasync and @ref\r
194  * mqtt_function_unsubscribeasync.\r
195  *\r
196  * See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a\r
197  * description of the parameters and return values.\r
198  */\r
199 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
200                                            IotMqttConnection_t mqttConnection,\r
201                                            IotMqttSerializeSubscribe_t serializeSubscription,\r
202                                            const IotMqttSubscription_t * pSubscriptionList,\r
203                                            size_t subscriptionCount,\r
204                                            uint32_t flags,\r
205                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
206                                            IotMqttOperation_t * const pOperationReference );\r
207 \r
208 /**\r
209  * @cond DOXYGEN_IGNORE\r
210  * Doxygen should ignore this section.\r
211  *\r
212  * Declaration of local MQTT serializer override selectors\r
213  */\r
214 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
215     _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializePingreq_t,\r
216                                    _getMqttPingreqSerializer,\r
217                                    _IotMqtt_SerializePingreq,\r
218                                    serialize.pingreq )\r
219     _SERIALIZER_OVERRIDE_SELECTOR( IotMqtt_SerializePublish_t,\r
220                                    _getMqttPublishSerializer,\r
221                                    _IotMqtt_SerializePublish,\r
222                                    serialize.publish )\r
223     _SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t,\r
224                                    _getMqttFreePacketFunc,\r
225                                    _IotMqtt_FreePacket,\r
226                                    freePacket )\r
227     _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeSubscribe_t,\r
228                                    _getMqttSubscribeSerializer,\r
229                                    _IotMqtt_SerializeSubscribe,\r
230                                    serialize.subscribe )\r
231     _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeSubscribe_t,\r
232                                    _getMqttUnsubscribeSerializer,\r
233                                    _IotMqtt_SerializeUnsubscribe,\r
234                                    serialize.unsubscribe )\r
235     _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeConnect_t,\r
236                                    _getMqttConnectSerializer,\r
237                                    _IotMqtt_SerializeConnect,\r
238                                    serialize.connect )\r
239     _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeDisconnect_t,\r
240                                    _getMqttDisconnectSerializer,\r
241                                    _IotMqtt_SerializeDisconnect,\r
242                                    serialize.disconnect )\r
243 #else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
244     #define _getMqttPingreqSerializer( pSerializer )        _IotMqtt_SerializePingreq\r
245     #define _getMqttPublishSerializer( pSerializer )        _IotMqtt_SerializePublish\r
246     #define _getMqttFreePacketFunc( pSerializer )           _IotMqtt_FreePacket\r
247     #define _getMqttSubscribeSerializer( pSerializer )      _IotMqtt_SerializeSubscribe\r
248     #define _getMqttUnsubscribeSerializer( pSerializer )    _IotMqtt_SerializeUnsubscribe\r
249     #define _getMqttConnectSerializer( pSerializer )        _IotMqtt_SerializeConnect\r
250     #define _getMqttDisconnectSerializer( pSerializer )     _IotMqtt_SerializeDisconnect\r
251 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
252 /** @endcond */\r
253 \r
254 /*-----------------------------------------------------------*/\r
255 \r
256 /**\r
257  * @brief Tracks whether @ref mqtt_function_init has been called.\r
258  *\r
259  * API functions will fail if @ref mqtt_function_init was not called.\r
260  */\r
261 static volatile uint32_t _initCalled = MQTT_LIBRARY_UNINITIALIZED;\r
262 \r
263 /*-----------------------------------------------------------*/\r
264 \r
265 static bool _checkInit( void )\r
266 {\r
267     bool status = true;\r
268 \r
269     if( _initCalled == MQTT_LIBRARY_UNINITIALIZED )\r
270     {\r
271         IotLogError( "IotMqtt_Init was not called." );\r
272 \r
273         status = false;\r
274     }\r
275     else\r
276     {\r
277         EMPTY_ELSE_MARKER;\r
278     }\r
279 \r
280     return status;\r
281 }\r
282 \r
283 /*-----------------------------------------------------------*/\r
284 \r
285 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
286                                               void * pMatch )\r
287 {\r
288     /* Because this function is called from a container function, the given link\r
289      * must never be NULL. */\r
290     IotMqtt_Assert( pSubscriptionLink != NULL );\r
291 \r
292     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
293                                                              pSubscriptionLink,\r
294                                                              link );\r
295 \r
296     /* Silence warnings about unused parameters. */\r
297     ( void ) pMatch;\r
298 \r
299     /* Set the unsubscribed flag. */\r
300     pSubscription->unsubscribed = true;\r
301 \r
302     return true;\r
303 }\r
304 \r
305 /*-----------------------------------------------------------*/\r
306 \r
307 static void _mqttSubscription_tryDestroy( void * pData )\r
308 {\r
309     _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;\r
310 \r
311     /* Reference count must not be negative. */\r
312     IotMqtt_Assert( pSubscription->references >= 0 );\r
313 \r
314     /* Unsubscribed flag should be set. */\r
315     IotMqtt_Assert( pSubscription->unsubscribed == true );\r
316 \r
317     /* Free the subscription if it has no references. */\r
318     if( pSubscription->references == 0 )\r
319     {\r
320         IotMqtt_FreeSubscription( pSubscription );\r
321     }\r
322     else\r
323     {\r
324         EMPTY_ELSE_MARKER;\r
325     }\r
326 }\r
327 \r
328 /*-----------------------------------------------------------*/\r
329 \r
330 static void _mqttOperation_tryDestroy( void * pData )\r
331 {\r
332     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;\r
333     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
334 \r
335     /* Incoming PUBLISH operations may always be freed. */\r
336     if( pOperation->incomingPublish == true )\r
337     {\r
338         /* Cancel the incoming PUBLISH operation's job. */\r
339         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
340                                                 pOperation->job,\r
341                                                 NULL );\r
342 \r
343         /* If the operation's job was not canceled, it must be already executing.\r
344          * Any other return value is invalid. */\r
345         IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
346                         ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
347 \r
348         /* Check if the incoming PUBLISH job was canceled. */\r
349         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
350         {\r
351             /* Job was canceled. Process incoming PUBLISH now to clean up. */\r
352             _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,\r
353                                              pOperation->job,\r
354                                              pOperation );\r
355         }\r
356         else\r
357         {\r
358             /* The executing job will process the PUBLISH, so nothing is done here. */\r
359             EMPTY_ELSE_MARKER;\r
360         }\r
361     }\r
362     else\r
363     {\r
364         /* Decrement reference count and destroy operation if possible. */\r
365         if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )\r
366         {\r
367             _IotMqtt_DestroyOperation( pOperation );\r
368         }\r
369         else\r
370         {\r
371             EMPTY_ELSE_MARKER;\r
372         }\r
373     }\r
374 }\r
375 \r
376 /*-----------------------------------------------------------*/\r
377 \r
378 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
379                                        uint16_t keepAliveSeconds,\r
380                                        _mqttConnection_t * pMqttConnection )\r
381 {\r
382     bool status = true;\r
383     IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
384     IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;\r
385 \r
386     /* Network information is not used when MQTT packet serializers are disabled. */\r
387     ( void ) pNetworkInfo;\r
388 \r
389     /* Set PINGREQ operation members. */\r
390     pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;\r
391 \r
392     /* Convert the keep-alive interval to milliseconds. */\r
393     pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;\r
394     pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;\r
395 \r
396     /* Generate a PINGREQ packet. */\r
397     serializeStatus = _getMqttPingreqSerializer( pMqttConnection->pSerializer )( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),\r
398                                                                                  &( pMqttConnection->pingreq.u.operation.packetSize ) );\r
399 \r
400     if( serializeStatus != IOT_MQTT_SUCCESS )\r
401     {\r
402         IotLogError( "Failed to allocate PINGREQ packet for new connection." );\r
403 \r
404         status = false;\r
405     }\r
406     else\r
407     {\r
408         /* Create the task pool job that processes keep-alive. */\r
409         jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
410                                            pMqttConnection,\r
411                                            &( pMqttConnection->pingreq.jobStorage ),\r
412                                            &( pMqttConnection->pingreq.job ) );\r
413 \r
414         /* Task pool job creation for a pre-allocated job should never fail.\r
415          * Abort the program if it does. */\r
416         if( jobStatus != IOT_TASKPOOL_SUCCESS )\r
417         {\r
418             IotLogError( "Failed to create keep-alive job for new connection." );\r
419 \r
420             IotMqtt_Assert( false );\r
421         }\r
422         else\r
423         {\r
424             EMPTY_ELSE_MARKER;\r
425         }\r
426 \r
427         /* Keep-alive references its MQTT connection, so increment reference. */\r
428         ( pMqttConnection->references )++;\r
429     }\r
430 \r
431     return status;\r
432 }\r
433 \r
434 /*-----------------------------------------------------------*/\r
435 \r
436 static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * pNetworkInfo,\r
437                                                    IotNetworkConnection_t * pNetworkConnection,\r
438                                                    bool * pCreatedNewNetworkConnection )\r
439 {\r
440     IOT_FUNCTION_ENTRY( IotNetworkError_t, IOT_NETWORK_SUCCESS );\r
441 \r
442     /* Network info must not be NULL. */\r
443     if( pNetworkInfo == NULL )\r
444     {\r
445         IotLogError( "Network information cannot be NULL." );\r
446 \r
447         IOT_SET_AND_GOTO_CLEANUP( IOT_NETWORK_BAD_PARAMETER );\r
448     }\r
449     else\r
450     {\r
451         EMPTY_ELSE_MARKER;\r
452     }\r
453 \r
454     /* Create a new network connection if requested. Otherwise, copy the existing\r
455      * network connection. */\r
456     if( pNetworkInfo->createNetworkConnection == true )\r
457     {\r
458         status = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,\r
459                                                           pNetworkInfo->u.setup.pNetworkCredentialInfo,\r
460                                                           pNetworkConnection );\r
461 \r
462         if( status == IOT_NETWORK_SUCCESS )\r
463         {\r
464             /* This MQTT connection owns the network connection it created and\r
465              * should destroy it on cleanup. */\r
466             *pCreatedNewNetworkConnection = true;\r
467         }\r
468         else\r
469         {\r
470             IotLogError( "Failed to create network connection: %d", status );\r
471 \r
472             IOT_GOTO_CLEANUP();\r
473         }\r
474     }\r
475     else\r
476     {\r
477         /* A connection already exists; the caller should not destroy\r
478          * it on cleanup. */\r
479         *pNetworkConnection = pNetworkInfo->u.pNetworkConnection;\r
480         *pCreatedNewNetworkConnection = false;\r
481     }\r
482 \r
483     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
484 }\r
485 \r
486 /*-----------------------------------------------------------*/\r
487 \r
488 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
489                                                   const IotMqttNetworkInfo_t * pNetworkInfo,\r
490                                                   uint16_t keepAliveSeconds )\r
491 {\r
492     IOT_FUNCTION_ENTRY( bool, true );\r
493     _mqttConnection_t * pMqttConnection = NULL;\r
494     bool referencesMutexCreated = false, subscriptionMutexCreated = false;\r
495 \r
496     /* Allocate memory for the new MQTT connection. */\r
497     pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );\r
498 \r
499     if( pMqttConnection == NULL )\r
500     {\r
501         IotLogError( "Failed to allocate memory for new connection." );\r
502 \r
503         IOT_SET_AND_GOTO_CLEANUP( false );\r
504     }\r
505     else\r
506     {\r
507         /* Clear the MQTT connection, then copy the MQTT server mode, network\r
508          * interface, and disconnect callback. */\r
509         ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );\r
510         pMqttConnection->awsIotMqttMode = awsIotMqttMode;\r
511         pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;\r
512         pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;\r
513 \r
514         /* Start a new MQTT connection with a reference count of 1. */\r
515         pMqttConnection->references = 1;\r
516     }\r
517 \r
518     /* Create the references mutex for a new connection. It is a recursive mutex. */\r
519     referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );\r
520 \r
521     if( referencesMutexCreated == false )\r
522     {\r
523         IotLogError( "Failed to create references mutex for new connection." );\r
524 \r
525         IOT_SET_AND_GOTO_CLEANUP( false );\r
526     }\r
527     else\r
528     {\r
529         EMPTY_ELSE_MARKER;\r
530     }\r
531 \r
532     /* Create the subscription mutex for a new connection. */\r
533     subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );\r
534 \r
535     if( subscriptionMutexCreated == false )\r
536     {\r
537         IotLogError( "Failed to create subscription mutex for new connection." );\r
538 \r
539         IOT_SET_AND_GOTO_CLEANUP( false );\r
540     }\r
541     else\r
542     {\r
543         EMPTY_ELSE_MARKER;\r
544     }\r
545 \r
546     /* Create the new connection's subscription and operation lists. */\r
547     IotListDouble_Create( &( pMqttConnection->subscriptionList ) );\r
548     IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );\r
549     IotListDouble_Create( &( pMqttConnection->pendingResponse ) );\r
550 \r
551     /* Check if keep-alive is active for this connection. */\r
552     if( keepAliveSeconds != 0 )\r
553     {\r
554         if( _createKeepAliveOperation( pNetworkInfo,\r
555                                        keepAliveSeconds,\r
556                                        pMqttConnection ) == false )\r
557         {\r
558             IOT_SET_AND_GOTO_CLEANUP( false );\r
559         }\r
560         else\r
561         {\r
562             EMPTY_ELSE_MARKER;\r
563         }\r
564     }\r
565     else\r
566     {\r
567         EMPTY_ELSE_MARKER;\r
568     }\r
569 \r
570     /* Clean up mutexes and connection if this function failed. */\r
571     IOT_FUNCTION_CLEANUP_BEGIN();\r
572 \r
573     if( status == false )\r
574     {\r
575         if( subscriptionMutexCreated == true )\r
576         {\r
577             IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
578         }\r
579         else\r
580         {\r
581             EMPTY_ELSE_MARKER;\r
582         }\r
583 \r
584         if( referencesMutexCreated == true )\r
585         {\r
586             IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
587         }\r
588         else\r
589         {\r
590             EMPTY_ELSE_MARKER;\r
591         }\r
592 \r
593         if( pMqttConnection != NULL )\r
594         {\r
595             IotMqtt_FreeConnection( pMqttConnection );\r
596             pMqttConnection = NULL;\r
597         }\r
598         else\r
599         {\r
600             EMPTY_ELSE_MARKER;\r
601         }\r
602     }\r
603     else\r
604     {\r
605         EMPTY_ELSE_MARKER;\r
606     }\r
607 \r
608     return pMqttConnection;\r
609 }\r
610 \r
611 /*-----------------------------------------------------------*/\r
612 \r
613 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )\r
614 {\r
615     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
616 \r
617     /* Clean up keep-alive if still allocated. */\r
618     if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
619     {\r
620         IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
621 \r
622         _getMqttFreePacketFunc( pMqttConnection->pSerializer )( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
623 \r
624         /* Clear data about the keep-alive. */\r
625         pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
626         pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
627         pMqttConnection->pingreq.u.operation.packetSize = 0;\r
628 \r
629         /* Decrement reference count. */\r
630         pMqttConnection->references--;\r
631     }\r
632     else\r
633     {\r
634         EMPTY_ELSE_MARKER;\r
635     }\r
636 \r
637     /* A connection to be destroyed should have no keep-alive and at most 1\r
638      * reference. */\r
639     IotMqtt_Assert( pMqttConnection->references <= 1 );\r
640     IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );\r
641     IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );\r
642     IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );\r
643 \r
644     /* Remove all subscriptions. */\r
645     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
646     IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
647                                     _mqttSubscription_setUnsubscribe,\r
648                                     NULL,\r
649                                     _mqttSubscription_tryDestroy,\r
650                                     offsetof( _mqttSubscription_t, link ) );\r
651     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
652 \r
653     /* Destroy an owned network connection. */\r
654     if( pMqttConnection->ownNetworkConnection == true )\r
655     {\r
656         networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );\r
657 \r
658         if( networkStatus != IOT_NETWORK_SUCCESS )\r
659         {\r
660             IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",\r
661                         pMqttConnection );\r
662         }\r
663         else\r
664         {\r
665             IotLogInfo( "(MQTT connection %p) Network connection destroyed.",\r
666                         pMqttConnection );\r
667         }\r
668     }\r
669     else\r
670     {\r
671         EMPTY_ELSE_MARKER;\r
672     }\r
673 \r
674     /* Destroy mutexes. */\r
675     IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
676     IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
677 \r
678     IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );\r
679 \r
680     /* Free connection. */\r
681     IotMqtt_FreeConnection( pMqttConnection );\r
682 }\r
683 \r
684 /*-----------------------------------------------------------*/\r
685 static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation,\r
686                                                 IotMqttConnection_t mqttConnection,\r
687                                                 const IotMqttSubscription_t * pSubscriptionList,\r
688                                                 size_t subscriptionCount,\r
689                                                 uint32_t flags,\r
690                                                 IotMqttOperation_t * const pOperationReference )\r
691 {\r
692     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
693 \r
694     /* This function should only be called for subscribe or unsubscribe. */\r
695     IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||\r
696                     ( operation == IOT_MQTT_UNSUBSCRIBE ) );\r
697 \r
698     /* Check that IotMqtt_Init was called. */\r
699     if( _checkInit() == false )\r
700     {\r
701         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );\r
702     }\r
703     else\r
704     {\r
705         EMPTY_ELSE_MARKER;\r
706     }\r
707 \r
708     /* Check that all elements in the subscription list are valid. */\r
709     if( _IotMqtt_ValidateSubscriptionList( operation,\r
710                                            mqttConnection->awsIotMqttMode,\r
711                                            pSubscriptionList,\r
712                                            subscriptionCount ) == false )\r
713     {\r
714         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
715     }\r
716     else\r
717     {\r
718         EMPTY_ELSE_MARKER;\r
719     }\r
720 \r
721     /* Check that a reference pointer is provided for a waitable operation. */\r
722     if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
723     {\r
724         if( pOperationReference == NULL )\r
725         {\r
726             IotLogError( "Reference must be provided for a waitable %s.",\r
727                          IotMqtt_OperationType( operation ) );\r
728 \r
729             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
730         }\r
731         else\r
732         {\r
733             EMPTY_ELSE_MARKER;\r
734         }\r
735     }\r
736     else\r
737     {\r
738         EMPTY_ELSE_MARKER;\r
739     }\r
740 \r
741     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
742 }\r
743 \r
744 /*-----------------------------------------------------------*/\r
745 \r
746 static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t operation,\r
747                                                        IotMqttConnection_t mqttConnection,\r
748                                                        IotMqttSerializeSubscribe_t serializeSubscription,\r
749                                                        const IotMqttSubscription_t * pSubscriptionList,\r
750                                                        size_t subscriptionCount,\r
751                                                        uint32_t flags,\r
752                                                        const IotMqttCallbackInfo_t * pCallbackInfo,\r
753                                                        _mqttOperation_t ** ppSubscriptionOperation )\r
754 {\r
755     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
756     _mqttOperation_t * pSubscriptionOperation = NULL;\r
757 \r
758     /* Create a subscription operation. */\r
759     status = _IotMqtt_CreateOperation( mqttConnection,\r
760                                        flags,\r
761                                        pCallbackInfo,\r
762                                        ppSubscriptionOperation );\r
763 \r
764     if( status != IOT_MQTT_SUCCESS )\r
765     {\r
766         IOT_GOTO_CLEANUP();\r
767     }\r
768     else\r
769     {\r
770         pSubscriptionOperation = ( *ppSubscriptionOperation );\r
771     }\r
772 \r
773     /* Check the subscription operation data and set the operation type. */\r
774     IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
775     IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );\r
776     pSubscriptionOperation->u.operation.type = operation;\r
777 \r
778     /* Generate a subscription packet from the subscription list. */\r
779     status = serializeSubscription( pSubscriptionList,\r
780                                     subscriptionCount,\r
781                                     &( pSubscriptionOperation->u.operation.pMqttPacket ),\r
782                                     &( pSubscriptionOperation->u.operation.packetSize ),\r
783                                     &( pSubscriptionOperation->u.operation.packetIdentifier ) );\r
784 \r
785     if( status != IOT_MQTT_SUCCESS )\r
786     {\r
787         IOT_GOTO_CLEANUP();\r
788     }\r
789     else\r
790     {\r
791         EMPTY_ELSE_MARKER;\r
792     }\r
793 \r
794     /* Check the serialized MQTT packet. */\r
795     IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );\r
796     IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );\r
797 \r
798     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
799 }\r
800 \r
801 /*-----------------------------------------------------------*/\r
802 \r
803 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
804                                            IotMqttConnection_t mqttConnection,\r
805                                            IotMqttSerializeSubscribe_t serializeSubscription,\r
806                                            const IotMqttSubscription_t * pSubscriptionList,\r
807                                            size_t subscriptionCount,\r
808                                            uint32_t flags,\r
809                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
810                                            IotMqttOperation_t * const pOperationReference )\r
811 {\r
812     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
813     _mqttOperation_t * pSubscriptionOperation = NULL;\r
814 \r
815     /* Create and serialize the subscription operation. */\r
816     status = _subscriptionCreateAndSerialize( operation,\r
817                                               mqttConnection,\r
818                                               serializeSubscription,\r
819                                               pSubscriptionList,\r
820                                               subscriptionCount,\r
821                                               flags,\r
822                                               pCallbackInfo,\r
823                                               &pSubscriptionOperation );\r
824 \r
825     if( status != IOT_MQTT_SUCCESS )\r
826     {\r
827         IOT_GOTO_CLEANUP();\r
828     }\r
829     else\r
830     {\r
831         EMPTY_ELSE_MARKER;\r
832     }\r
833 \r
834     /* Add the subscription list for a SUBSCRIBE. */\r
835     if( operation == IOT_MQTT_SUBSCRIBE )\r
836     {\r
837         status = _IotMqtt_AddSubscriptions( mqttConnection,\r
838                                             pSubscriptionOperation->u.operation.packetIdentifier,\r
839                                             pSubscriptionList,\r
840                                             subscriptionCount );\r
841 \r
842         if( status != IOT_MQTT_SUCCESS )\r
843         {\r
844             IOT_GOTO_CLEANUP();\r
845         }\r
846         else\r
847         {\r
848             EMPTY_ELSE_MARKER;\r
849         }\r
850     }\r
851 \r
852     /* Set the reference, if provided. */\r
853     if( pOperationReference != NULL )\r
854     {\r
855         *pOperationReference = pSubscriptionOperation;\r
856     }\r
857     else\r
858     {\r
859         EMPTY_ELSE_MARKER;\r
860     }\r
861 \r
862     /* Send the SUBSCRIBE packet. */\r
863     if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND )\r
864     {\r
865         _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pSubscriptionOperation->job, pSubscriptionOperation );\r
866     }\r
867     else\r
868     {\r
869         status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,\r
870                                              _IotMqtt_ProcessSend,\r
871                                              0 );\r
872 \r
873         if( status != IOT_MQTT_SUCCESS )\r
874         {\r
875             IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",\r
876                          mqttConnection,\r
877                          IotMqtt_OperationType( operation ) );\r
878 \r
879             if( operation == IOT_MQTT_SUBSCRIBE )\r
880             {\r
881                 _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,\r
882                                                      pSubscriptionOperation->u.operation.packetIdentifier,\r
883                                                      MQTT_REMOVE_ALL_SUBSCRIPTIONS );\r
884             }\r
885             else\r
886             {\r
887                 EMPTY_ELSE_MARKER;\r
888             }\r
889 \r
890             /* Clear the previously set (and now invalid) reference. */\r
891             if( pOperationReference != NULL )\r
892             {\r
893                 *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;\r
894             }\r
895             else\r
896             {\r
897                 EMPTY_ELSE_MARKER;\r
898             }\r
899 \r
900             IOT_GOTO_CLEANUP();\r
901         }\r
902     }\r
903 \r
904     /* Clean up if this function failed. */\r
905     IOT_FUNCTION_CLEANUP_BEGIN();\r
906 \r
907     if( status != IOT_MQTT_SUCCESS )\r
908     {\r
909         if( pSubscriptionOperation != NULL )\r
910         {\r
911             _IotMqtt_DestroyOperation( pSubscriptionOperation );\r
912         }\r
913         else\r
914         {\r
915             EMPTY_ELSE_MARKER;\r
916         }\r
917     }\r
918     else\r
919     {\r
920         status = IOT_MQTT_STATUS_PENDING;\r
921 \r
922         IotLogInfo( "(MQTT connection %p) %s operation scheduled.",\r
923                     mqttConnection,\r
924                     IotMqtt_OperationType( operation ) );\r
925     }\r
926 \r
927     IOT_FUNCTION_CLEANUP_END();\r
928 }\r
929 \r
930 /*-----------------------------------------------------------*/\r
931 \r
932 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
933 {\r
934     bool disconnected = false;\r
935 \r
936     /* Lock the mutex protecting the reference count. */\r
937     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
938 \r
939     /* Reference count must not be negative. */\r
940     IotMqtt_Assert( pMqttConnection->references >= 0 );\r
941 \r
942     /* Read connection status. */\r
943     disconnected = pMqttConnection->disconnected;\r
944 \r
945     /* Increment the connection's reference count if it is not disconnected. */\r
946     if( disconnected == false )\r
947     {\r
948         ( pMqttConnection->references )++;\r
949         IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
950                      pMqttConnection,\r
951                      ( long int ) pMqttConnection->references - 1,\r
952                      ( long int ) pMqttConnection->references );\r
953     }\r
954     else\r
955     {\r
956         IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );\r
957     }\r
958 \r
959     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
960 \r
961     return( disconnected == false );\r
962 }\r
963 \r
964 /*-----------------------------------------------------------*/\r
965 \r
966 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
967 {\r
968     bool destroyConnection = false;\r
969 \r
970     /* Lock the mutex protecting the reference count. */\r
971     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
972 \r
973     /* Decrement reference count. It must not be negative. */\r
974     ( pMqttConnection->references )--;\r
975     IotMqtt_Assert( pMqttConnection->references >= 0 );\r
976 \r
977     IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
978                  pMqttConnection,\r
979                  ( long int ) pMqttConnection->references + 1,\r
980                  ( long int ) pMqttConnection->references );\r
981 \r
982     /* Check if this connection may be destroyed. */\r
983     if( pMqttConnection->references == 0 )\r
984     {\r
985         destroyConnection = true;\r
986     }\r
987     else\r
988     {\r
989         EMPTY_ELSE_MARKER;\r
990     }\r
991 \r
992     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
993 \r
994     /* Destroy an unreferenced MQTT connection. */\r
995     if( destroyConnection == true )\r
996     {\r
997         IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",\r
998                      pMqttConnection );\r
999         _destroyMqttConnection( pMqttConnection );\r
1000     }\r
1001     else\r
1002     {\r
1003         EMPTY_ELSE_MARKER;\r
1004     }\r
1005 }\r
1006 \r
1007 /*-----------------------------------------------------------*/\r
1008 \r
1009 IotMqttError_t IotMqtt_Init( void )\r
1010 {\r
1011     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
1012     uint32_t allowInitialization = Atomic_CompareAndSwap_u32( &_initCalled,\r
1013                                                               MQTT_LIBRARY_INITIALIZED,\r
1014                                                               MQTT_LIBRARY_UNINITIALIZED );\r
1015 \r
1016     if( allowInitialization == 1 )\r
1017     {\r
1018         /* Call any additional serializer initialization function if serializer\r
1019          * overrides are enabled. */\r
1020         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1021             #ifdef _IotMqtt_InitSerializeAdditional\r
1022                 if( _IotMqtt_InitSerializeAdditional() == false )\r
1023                 {\r
1024                     /* Log initialization status. */\r
1025                     IotLogError( "Failed to initialize MQTT library serializer. " );\r
1026 \r
1027                     status = IOT_MQTT_INIT_FAILED;\r
1028                 }\r
1029                 else\r
1030                 {\r
1031                     EMPTY_ELSE_MARKER;\r
1032                 }\r
1033             #endif /* ifdef _IotMqtt_InitSerializeAdditional */\r
1034         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1035 \r
1036         if( status == IOT_MQTT_SUCCESS )\r
1037         {\r
1038             IotLogInfo( "MQTT library successfully initialized." );\r
1039         }\r
1040         else\r
1041         {\r
1042             EMPTY_ELSE_MARKER;\r
1043         }\r
1044     }\r
1045     else\r
1046     {\r
1047         IotLogWarn( "IotMqtt_Init called with library already initialized." );\r
1048     }\r
1049 \r
1050     return status;\r
1051 }\r
1052 \r
1053 /*-----------------------------------------------------------*/\r
1054 \r
1055 void IotMqtt_Cleanup( void )\r
1056 {\r
1057     uint32_t allowCleanup = Atomic_CompareAndSwap_u32( &_initCalled,\r
1058                                                        MQTT_LIBRARY_UNINITIALIZED,\r
1059                                                        MQTT_LIBRARY_INITIALIZED );\r
1060 \r
1061     if( allowCleanup == 1 )\r
1062     {\r
1063         /* Call any additional serializer cleanup initialization function if serializer\r
1064          * overrides are enabled. */\r
1065         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1066             #ifdef _IotMqtt_CleanupSerializeAdditional\r
1067                 _IotMqtt_CleanupSerializeAdditional();\r
1068             #endif\r
1069         #endif\r
1070 \r
1071         IotLogInfo( "MQTT library cleanup done." );\r
1072     }\r
1073     else\r
1074     {\r
1075         IotLogWarn( "IotMqtt_Init was not called before IotMqtt_Cleanup." );\r
1076     }\r
1077 }\r
1078 \r
1079 /*-----------------------------------------------------------*/\r
1080 \r
1081 IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,\r
1082                                 const IotMqttConnectInfo_t * pConnectInfo,\r
1083                                 uint32_t timeoutMs,\r
1084                                 IotMqttConnection_t * const pMqttConnection )\r
1085 {\r
1086     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
1087     bool ownNetworkConnection = false;\r
1088     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
1089     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
1090     IotNetworkConnection_t pNetworkConnection = { 0 };\r
1091     _mqttOperation_t * pOperation = NULL;\r
1092     _mqttConnection_t * pNewMqttConnection = NULL;\r
1093 \r
1094     /* Check that IotMqtt_Init was called. */\r
1095     if( _checkInit() == false )\r
1096     {\r
1097         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );\r
1098     }\r
1099     else\r
1100     {\r
1101         EMPTY_ELSE_MARKER;\r
1102     }\r
1103 \r
1104     /* Validate network interface and connect info. */\r
1105     if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )\r
1106     {\r
1107         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1108     }\r
1109     else\r
1110     {\r
1111         EMPTY_ELSE_MARKER;\r
1112     }\r
1113 \r
1114     networkStatus = _createNetworkConnection( pNetworkInfo,\r
1115                                               &pNetworkConnection,\r
1116                                               &ownNetworkConnection );\r
1117 \r
1118     if( networkStatus != IOT_NETWORK_SUCCESS )\r
1119     {\r
1120         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
1121     }\r
1122     else\r
1123     {\r
1124         EMPTY_ELSE_MARKER;\r
1125     }\r
1126 \r
1127     IotLogInfo( "Establishing new MQTT connection." );\r
1128 \r
1129     /* Initialize a new MQTT connection object. */\r
1130     pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
1131                                                 pNetworkInfo,\r
1132                                                 pConnectInfo->keepAliveSeconds );\r
1133 \r
1134     if( pNewMqttConnection == NULL )\r
1135     {\r
1136         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
1137     }\r
1138     else\r
1139     {\r
1140         /* Set the network connection associated with the MQTT connection. */\r
1141         pNewMqttConnection->pNetworkConnection = pNetworkConnection;\r
1142         pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;\r
1143 \r
1144         /* Set the MQTT packet serializer overrides. */\r
1145         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1146             pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;\r
1147         #else\r
1148             pNewMqttConnection->pSerializer = NULL;\r
1149         #endif\r
1150     }\r
1151 \r
1152     /* Set the MQTT receive callback. */\r
1153     networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,\r
1154                                                                                IotMqtt_ReceiveCallback,\r
1155                                                                                pNewMqttConnection );\r
1156 \r
1157     if( networkStatus != IOT_NETWORK_SUCCESS )\r
1158     {\r
1159         IotLogError( "Failed to set MQTT network receive callback." );\r
1160 \r
1161         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
1162     }\r
1163     else\r
1164     {\r
1165         EMPTY_ELSE_MARKER;\r
1166     }\r
1167 \r
1168     /* Create a CONNECT operation. */\r
1169     status = _IotMqtt_CreateOperation( pNewMqttConnection,\r
1170                                        IOT_MQTT_FLAG_WAITABLE,\r
1171                                        NULL,\r
1172                                        &pOperation );\r
1173 \r
1174     if( status != IOT_MQTT_SUCCESS )\r
1175     {\r
1176         IOT_GOTO_CLEANUP();\r
1177     }\r
1178     else\r
1179     {\r
1180         EMPTY_ELSE_MARKER;\r
1181     }\r
1182 \r
1183     /* Ensure the members set by operation creation and serialization\r
1184      * are appropriate for a blocking CONNECT. */\r
1185     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1186     IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
1187                     == IOT_MQTT_FLAG_WAITABLE );\r
1188     IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
1189 \r
1190     /* Set the operation type. */\r
1191     pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
1192 \r
1193     /* Add previous session subscriptions. */\r
1194     if( pConnectInfo->pPreviousSubscriptions != NULL )\r
1195     {\r
1196         /* Previous subscription count should have been validated as nonzero. */\r
1197         IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );\r
1198 \r
1199         status = _IotMqtt_AddSubscriptions( pNewMqttConnection,\r
1200                                             2,\r
1201                                             pConnectInfo->pPreviousSubscriptions,\r
1202                                             pConnectInfo->previousSubscriptionCount );\r
1203 \r
1204         if( status != IOT_MQTT_SUCCESS )\r
1205         {\r
1206             IOT_GOTO_CLEANUP();\r
1207         }\r
1208         else\r
1209         {\r
1210             EMPTY_ELSE_MARKER;\r
1211         }\r
1212     }\r
1213     else\r
1214     {\r
1215         EMPTY_ELSE_MARKER;\r
1216     }\r
1217 \r
1218     /* Convert the connect info and will info objects to an MQTT CONNECT packet. */\r
1219     status = _getMqttConnectSerializer( pNetworkInfo->pMqttSerializer )( pConnectInfo,\r
1220                                                                          &( pOperation->u.operation.pMqttPacket ),\r
1221                                                                          &( pOperation->u.operation.packetSize ) );\r
1222 \r
1223     if( status != IOT_MQTT_SUCCESS )\r
1224     {\r
1225         IOT_GOTO_CLEANUP();\r
1226     }\r
1227     else\r
1228     {\r
1229         EMPTY_ELSE_MARKER;\r
1230     }\r
1231 \r
1232     /* Check the serialized MQTT packet. */\r
1233     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1234     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1235 \r
1236     /* Send the CONNECT packet. */\r
1237     _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );\r
1238 \r
1239     /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */\r
1240     status = IotMqtt_Wait( pOperation, timeoutMs );\r
1241 \r
1242     /* The call to wait cleans up the CONNECT operation, so set the pointer\r
1243      * to NULL. */\r
1244     pOperation = NULL;\r
1245 \r
1246     /* When a connection is successfully established, schedule keep-alive job. */\r
1247     if( status == IOT_MQTT_SUCCESS )\r
1248     {\r
1249         /* Check if a keep-alive job should be scheduled. */\r
1250         if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
1251         {\r
1252             IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
1253 \r
1254             taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
1255                                                            pNewMqttConnection->pingreq.job,\r
1256                                                            pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );\r
1257 \r
1258             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1259             {\r
1260                 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );\r
1261             }\r
1262             else\r
1263             {\r
1264                 EMPTY_ELSE_MARKER;\r
1265             }\r
1266         }\r
1267         else\r
1268         {\r
1269             EMPTY_ELSE_MARKER;\r
1270         }\r
1271     }\r
1272     else\r
1273     {\r
1274         EMPTY_ELSE_MARKER;\r
1275     }\r
1276 \r
1277     IOT_FUNCTION_CLEANUP_BEGIN();\r
1278 \r
1279     if( status != IOT_MQTT_SUCCESS )\r
1280     {\r
1281         IotLogError( "Failed to establish new MQTT connection, error %s.",\r
1282                      IotMqtt_strerror( status ) );\r
1283 \r
1284         /* The network connection must be closed if it was created. */\r
1285         if( ownNetworkConnection == true )\r
1286         {\r
1287             networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );\r
1288 \r
1289             if( networkStatus != IOT_NETWORK_SUCCESS )\r
1290             {\r
1291                 IotLogWarn( "Failed to close network connection." );\r
1292             }\r
1293             else\r
1294             {\r
1295                 IotLogInfo( "Network connection closed on error." );\r
1296             }\r
1297         }\r
1298         else\r
1299         {\r
1300             EMPTY_ELSE_MARKER;\r
1301         }\r
1302 \r
1303         if( pOperation != NULL )\r
1304         {\r
1305             _IotMqtt_DestroyOperation( pOperation );\r
1306         }\r
1307         else\r
1308         {\r
1309             EMPTY_ELSE_MARKER;\r
1310         }\r
1311 \r
1312         if( pNewMqttConnection != NULL )\r
1313         {\r
1314             _destroyMqttConnection( pNewMqttConnection );\r
1315         }\r
1316         else\r
1317         {\r
1318             EMPTY_ELSE_MARKER;\r
1319         }\r
1320     }\r
1321     else\r
1322     {\r
1323         IotLogInfo( "New MQTT connection %p established.", pMqttConnection );\r
1324 \r
1325         /* Set the output parameter. */\r
1326         *pMqttConnection = pNewMqttConnection;\r
1327     }\r
1328 \r
1329     IOT_FUNCTION_CLEANUP_END();\r
1330 }\r
1331 \r
1332 /*-----------------------------------------------------------*/\r
1333 \r
1334 void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,\r
1335                          uint32_t flags )\r
1336 {\r
1337     bool disconnected = false, initCalled = false;\r
1338     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1339     _mqttOperation_t * pOperation = NULL;\r
1340 \r
1341     /* Check that IotMqtt_Init was called. */\r
1342     initCalled = _checkInit();\r
1343 \r
1344     if( initCalled == false )\r
1345     {\r
1346         IOT_GOTO_CLEANUP();\r
1347     }\r
1348     else\r
1349     {\r
1350         EMPTY_ELSE_MARKER;\r
1351     }\r
1352 \r
1353     /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"\r
1354      * flag is not set. */\r
1355     if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == IOT_MQTT_FLAG_CLEANUP_ONLY )\r
1356     {\r
1357         IOT_GOTO_CLEANUP();\r
1358     }\r
1359 \r
1360     /* Read the connection status. */\r
1361     IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
1362     disconnected = mqttConnection->disconnected;\r
1363     IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
1364 \r
1365     if( disconnected == true )\r
1366     {\r
1367         IOT_GOTO_CLEANUP();\r
1368     }\r
1369 \r
1370     IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );\r
1371 \r
1372     /* Create a DISCONNECT operation. This function blocks until the DISCONNECT\r
1373      * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */\r
1374     status = _IotMqtt_CreateOperation( mqttConnection,\r
1375                                        IOT_MQTT_FLAG_WAITABLE,\r
1376                                        NULL,\r
1377                                        &pOperation );\r
1378 \r
1379     if( status == IOT_MQTT_SUCCESS )\r
1380     {\r
1381         /* Ensure that the members set by operation creation and serialization\r
1382          * are appropriate for a blocking DISCONNECT. */\r
1383         IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1384         IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
1385                         == IOT_MQTT_FLAG_WAITABLE );\r
1386         IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
1387 \r
1388         /* Set the operation type. */\r
1389         pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
1390 \r
1391         /* Generate a DISCONNECT packet. */\r
1392         status = _getMqttDisconnectSerializer( mqttConnection->pSerializer )( &( pOperation->u.operation.pMqttPacket ),\r
1393                                                                               &( pOperation->u.operation.packetSize ) );\r
1394     }\r
1395     else\r
1396     {\r
1397         EMPTY_ELSE_MARKER;\r
1398     }\r
1399 \r
1400     if( status == IOT_MQTT_SUCCESS )\r
1401     {\r
1402         /* Check the serialized MQTT packet. */\r
1403         IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1404         IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1405 \r
1406         /* Send the DISCONNECT packet. */\r
1407         _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );\r
1408 \r
1409         /* Wait a short time for the DISCONNECT packet to be transmitted. */\r
1410         status = IotMqtt_Wait( pOperation,\r
1411                                IOT_MQTT_RESPONSE_WAIT_MS );\r
1412 \r
1413         /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,\r
1414          * or NETWORK ERROR. */\r
1415         if( status == IOT_MQTT_SUCCESS )\r
1416         {\r
1417             IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );\r
1418         }\r
1419         else\r
1420         {\r
1421             IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||\r
1422                             ( status == IOT_MQTT_NETWORK_ERROR ) );\r
1423 \r
1424             IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",\r
1425                         mqttConnection,\r
1426                         IotMqtt_strerror( status ) );\r
1427         }\r
1428     }\r
1429     else\r
1430     {\r
1431         EMPTY_ELSE_MARKER;\r
1432     }\r
1433 \r
1434     /* This function has no return value and no cleanup, but uses the cleanup\r
1435      * label to exit on error. */\r
1436     IOT_FUNCTION_CLEANUP_BEGIN();\r
1437 \r
1438     if( initCalled == true )\r
1439     {\r
1440         /* Close the underlying network connection. This also cleans up keep-alive. */\r
1441         _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,\r
1442                                          mqttConnection );\r
1443 \r
1444         /* Check if the connection may be destroyed. */\r
1445         IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
1446 \r
1447         /* At this point, the connection should be marked disconnected. */\r
1448         IotMqtt_Assert( mqttConnection->disconnected == true );\r
1449 \r
1450         /* Attempt cancel and destroy each operation in the connection's lists. */\r
1451         IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),\r
1452                                  _mqttOperation_tryDestroy,\r
1453                                  offsetof( _mqttOperation_t, link ) );\r
1454 \r
1455         IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),\r
1456                                  _mqttOperation_tryDestroy,\r
1457                                  offsetof( _mqttOperation_t, link ) );\r
1458 \r
1459         IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
1460 \r
1461         /* Decrement the connection reference count and destroy it if possible. */\r
1462         _IotMqtt_DecrementConnectionReferences( mqttConnection );\r
1463     }\r
1464 }\r
1465 \r
1466 /*-----------------------------------------------------------*/\r
1467 \r
1468 IotMqttError_t IotMqtt_SubscribeAsync( IotMqttConnection_t mqttConnection,\r
1469                                        const IotMqttSubscription_t * pSubscriptionList,\r
1470                                        size_t subscriptionCount,\r
1471                                        uint32_t flags,\r
1472                                        const IotMqttCallbackInfo_t * pCallbackInfo,\r
1473                                        IotMqttOperation_t * const pSubscribeOperation )\r
1474 {\r
1475     IotMqttError_t status = _subscriptionCommonSetup( IOT_MQTT_SUBSCRIBE,\r
1476                                                       mqttConnection,\r
1477                                                       pSubscriptionList,\r
1478                                                       subscriptionCount,\r
1479                                                       flags,\r
1480                                                       pSubscribeOperation );\r
1481 \r
1482     if( IOT_MQTT_SUCCESS == status )\r
1483     {\r
1484         status = _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
1485                                       mqttConnection,\r
1486                                       _getMqttSubscribeSerializer( mqttConnection->pSerializer ),\r
1487                                       pSubscriptionList,\r
1488                                       subscriptionCount,\r
1489                                       flags,\r
1490                                       pCallbackInfo,\r
1491                                       pSubscribeOperation );\r
1492     }\r
1493     else\r
1494     {\r
1495         EMPTY_ELSE_MARKER;\r
1496     }\r
1497 \r
1498     return status;\r
1499 }\r
1500 \r
1501 /*-----------------------------------------------------------*/\r
1502 \r
1503 IotMqttError_t IotMqtt_SubscribeSync( IotMqttConnection_t mqttConnection,\r
1504                                       const IotMqttSubscription_t * pSubscriptionList,\r
1505                                       size_t subscriptionCount,\r
1506                                       uint32_t flags,\r
1507                                       uint32_t timeoutMs )\r
1508 {\r
1509     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1510     IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1511 \r
1512     /* Flags are not used, but the parameter is present for future compatibility. */\r
1513     ( void ) flags;\r
1514 \r
1515     /* Call the asynchronous SUBSCRIBE function. */\r
1516     status = IotMqtt_SubscribeAsync( mqttConnection,\r
1517                                      pSubscriptionList,\r
1518                                      subscriptionCount,\r
1519                                      IOT_MQTT_FLAG_WAITABLE | MQTT_INTERNAL_FLAG_BLOCK_ON_SEND,\r
1520                                      NULL,\r
1521                                      &subscribeOperation );\r
1522 \r
1523     /* Wait for the SUBSCRIBE operation to complete. */\r
1524     if( status == IOT_MQTT_STATUS_PENDING )\r
1525     {\r
1526         status = IotMqtt_Wait( subscribeOperation, timeoutMs );\r
1527     }\r
1528     else\r
1529     {\r
1530         EMPTY_ELSE_MARKER;\r
1531     }\r
1532 \r
1533     /* Ensure that a status was set. */\r
1534     IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
1535 \r
1536     return status;\r
1537 }\r
1538 \r
1539 /*-----------------------------------------------------------*/\r
1540 \r
1541 IotMqttError_t IotMqtt_UnsubscribeAsync( IotMqttConnection_t mqttConnection,\r
1542                                          const IotMqttSubscription_t * pSubscriptionList,\r
1543                                          size_t subscriptionCount,\r
1544                                          uint32_t flags,\r
1545                                          const IotMqttCallbackInfo_t * pCallbackInfo,\r
1546                                          IotMqttOperation_t * const pUnsubscribeOperation )\r
1547 {\r
1548     IotMqttError_t status = _subscriptionCommonSetup( IOT_MQTT_UNSUBSCRIBE,\r
1549                                                       mqttConnection,\r
1550                                                       pSubscriptionList,\r
1551                                                       subscriptionCount,\r
1552                                                       flags,\r
1553                                                       pUnsubscribeOperation );\r
1554 \r
1555     if( IOT_MQTT_SUCCESS == status )\r
1556     {\r
1557         /* Remove the MQTT subscription list for an UNSUBSCRIBE. */\r
1558         _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,\r
1559                                                   pSubscriptionList,\r
1560                                                   subscriptionCount );\r
1561 \r
1562         status = _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
1563                                       mqttConnection,\r
1564                                       _getMqttUnsubscribeSerializer( mqttConnection->pSerializer ),\r
1565                                       pSubscriptionList,\r
1566                                       subscriptionCount,\r
1567                                       flags,\r
1568                                       pCallbackInfo,\r
1569                                       pUnsubscribeOperation );\r
1570     }\r
1571     else\r
1572     {\r
1573         EMPTY_ELSE_MARKER;\r
1574     }\r
1575 \r
1576     return status;\r
1577 }\r
1578 \r
1579 /*-----------------------------------------------------------*/\r
1580 \r
1581 IotMqttError_t IotMqtt_UnsubscribeSync( IotMqttConnection_t mqttConnection,\r
1582                                         const IotMqttSubscription_t * pSubscriptionList,\r
1583                                         size_t subscriptionCount,\r
1584                                         uint32_t flags,\r
1585                                         uint32_t timeoutMs )\r
1586 {\r
1587     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1588     IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1589 \r
1590     /* Flags are not used, but the parameter is present for future compatibility. */\r
1591     ( void ) flags;\r
1592 \r
1593     /* Call the asynchronous UNSUBSCRIBE function. */\r
1594     status = IotMqtt_UnsubscribeAsync( mqttConnection,\r
1595                                        pSubscriptionList,\r
1596                                        subscriptionCount,\r
1597                                        IOT_MQTT_FLAG_WAITABLE | MQTT_INTERNAL_FLAG_BLOCK_ON_SEND,\r
1598                                        NULL,\r
1599                                        &unsubscribeOperation );\r
1600 \r
1601     /* Wait for the UNSUBSCRIBE operation to complete. */\r
1602     if( status == IOT_MQTT_STATUS_PENDING )\r
1603     {\r
1604         status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );\r
1605     }\r
1606     else\r
1607     {\r
1608         EMPTY_ELSE_MARKER;\r
1609     }\r
1610 \r
1611     /* Ensure that a status was set. */\r
1612     IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
1613 \r
1614     return status;\r
1615 }\r
1616 \r
1617 /*-----------------------------------------------------------*/\r
1618 \r
1619 IotMqttError_t IotMqtt_PublishAsync( IotMqttConnection_t mqttConnection,\r
1620                                      const IotMqttPublishInfo_t * pPublishInfo,\r
1621                                      uint32_t flags,\r
1622                                      const IotMqttCallbackInfo_t * pCallbackInfo,\r
1623                                      IotMqttOperation_t * const pPublishOperation )\r
1624 {\r
1625     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
1626     _mqttOperation_t * pOperation = NULL;\r
1627     uint8_t ** pPacketIdentifierHigh = NULL;\r
1628 \r
1629     /* Check that IotMqtt_Init was called. */\r
1630     if( _checkInit() == false )\r
1631     {\r
1632         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );\r
1633     }\r
1634     else\r
1635     {\r
1636         EMPTY_ELSE_MARKER;\r
1637     }\r
1638 \r
1639     /* Check that the PUBLISH information is valid. */\r
1640     if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,\r
1641                                   pPublishInfo ) == false )\r
1642     {\r
1643         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1644     }\r
1645     else\r
1646     {\r
1647         EMPTY_ELSE_MARKER;\r
1648     }\r
1649 \r
1650     /* Check that no notification is requested for a QoS 0 publish. */\r
1651     if( pPublishInfo->qos == IOT_MQTT_QOS_0 )\r
1652     {\r
1653         if( pCallbackInfo != NULL )\r
1654         {\r
1655             IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
1656 \r
1657             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1658         }\r
1659         else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )\r
1660         {\r
1661             IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
1662 \r
1663             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1664         }\r
1665         else\r
1666         {\r
1667             EMPTY_ELSE_MARKER;\r
1668         }\r
1669 \r
1670         if( pPublishOperation != NULL )\r
1671         {\r
1672             IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );\r
1673         }\r
1674         else\r
1675         {\r
1676             EMPTY_ELSE_MARKER;\r
1677         }\r
1678     }\r
1679     else\r
1680     {\r
1681         EMPTY_ELSE_MARKER;\r
1682     }\r
1683 \r
1684     /* Check that a reference pointer is provided for a waitable operation. */\r
1685     if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
1686     {\r
1687         if( pPublishOperation == NULL )\r
1688         {\r
1689             IotLogError( "Reference must be provided for a waitable PUBLISH." );\r
1690 \r
1691             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1692         }\r
1693         else\r
1694         {\r
1695             EMPTY_ELSE_MARKER;\r
1696         }\r
1697     }\r
1698     else\r
1699     {\r
1700         EMPTY_ELSE_MARKER;\r
1701     }\r
1702 \r
1703     /* Create a PUBLISH operation. */\r
1704     status = _IotMqtt_CreateOperation( mqttConnection,\r
1705                                        flags,\r
1706                                        pCallbackInfo,\r
1707                                        &pOperation );\r
1708 \r
1709     if( status != IOT_MQTT_SUCCESS )\r
1710     {\r
1711         IOT_GOTO_CLEANUP();\r
1712     }\r
1713     else\r
1714     {\r
1715         EMPTY_ELSE_MARKER;\r
1716     }\r
1717 \r
1718     /* Check the PUBLISH operation data and set the operation type. */\r
1719     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1720     pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;\r
1721 \r
1722     /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */\r
1723     if( mqttConnection->awsIotMqttMode == true )\r
1724     {\r
1725         pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );\r
1726     }\r
1727     else\r
1728     {\r
1729         EMPTY_ELSE_MARKER;\r
1730     }\r
1731 \r
1732     /* Generate a PUBLISH packet from pPublishInfo. */\r
1733     status = _getMqttPublishSerializer( mqttConnection->pSerializer )( pPublishInfo,\r
1734                                                                        &( pOperation->u.operation.pMqttPacket ),\r
1735                                                                        &( pOperation->u.operation.packetSize ),\r
1736                                                                        &( pOperation->u.operation.packetIdentifier ),\r
1737                                                                        pPacketIdentifierHigh );\r
1738 \r
1739     if( status != IOT_MQTT_SUCCESS )\r
1740     {\r
1741         IOT_GOTO_CLEANUP();\r
1742     }\r
1743     else\r
1744     {\r
1745         EMPTY_ELSE_MARKER;\r
1746     }\r
1747 \r
1748     /* Check the serialized MQTT packet. */\r
1749     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1750     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1751 \r
1752     /* Initialize PUBLISH retry if retryLimit is set. */\r
1753     if( pPublishInfo->retryLimit > 0 )\r
1754     {\r
1755         /* A QoS 0 PUBLISH may not be retried. */\r
1756         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1757         {\r
1758             pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;\r
1759             pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;\r
1760         }\r
1761         else\r
1762         {\r
1763             EMPTY_ELSE_MARKER;\r
1764         }\r
1765     }\r
1766     else\r
1767     {\r
1768         EMPTY_ELSE_MARKER;\r
1769     }\r
1770 \r
1771     /* Set the reference, if provided. */\r
1772     if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1773     {\r
1774         if( pPublishOperation != NULL )\r
1775         {\r
1776             *pPublishOperation = pOperation;\r
1777         }\r
1778         else\r
1779         {\r
1780             EMPTY_ELSE_MARKER;\r
1781         }\r
1782     }\r
1783     else\r
1784     {\r
1785         EMPTY_ELSE_MARKER;\r
1786     }\r
1787 \r
1788     /* Send the PUBLISH packet. */\r
1789     if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND )\r
1790     {\r
1791         _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation );\r
1792     }\r
1793     else\r
1794     {\r
1795         status = _IotMqtt_ScheduleOperation( pOperation,\r
1796                                              _IotMqtt_ProcessSend,\r
1797                                              0 );\r
1798 \r
1799         if( status != IOT_MQTT_SUCCESS )\r
1800         {\r
1801             IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",\r
1802                          mqttConnection );\r
1803 \r
1804             /* Clear the previously set (and now invalid) reference. */\r
1805             if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1806             {\r
1807                 if( pPublishOperation != NULL )\r
1808                 {\r
1809                     *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1810                 }\r
1811                 else\r
1812                 {\r
1813                     EMPTY_ELSE_MARKER;\r
1814                 }\r
1815             }\r
1816             else\r
1817             {\r
1818                 EMPTY_ELSE_MARKER;\r
1819             }\r
1820 \r
1821             IOT_GOTO_CLEANUP();\r
1822         }\r
1823         else\r
1824         {\r
1825             EMPTY_ELSE_MARKER;\r
1826         }\r
1827     }\r
1828 \r
1829     /* Clean up the PUBLISH operation if this function fails. Otherwise, set the\r
1830      * appropriate return code based on QoS. */\r
1831     IOT_FUNCTION_CLEANUP_BEGIN();\r
1832 \r
1833     if( status != IOT_MQTT_SUCCESS )\r
1834     {\r
1835         if( pOperation != NULL )\r
1836         {\r
1837             _IotMqtt_DestroyOperation( pOperation );\r
1838         }\r
1839         else\r
1840         {\r
1841             EMPTY_ELSE_MARKER;\r
1842         }\r
1843     }\r
1844     else\r
1845     {\r
1846         if( pPublishInfo->qos > IOT_MQTT_QOS_0 )\r
1847         {\r
1848             status = IOT_MQTT_STATUS_PENDING;\r
1849         }\r
1850         else\r
1851         {\r
1852             EMPTY_ELSE_MARKER;\r
1853         }\r
1854 \r
1855         IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",\r
1856                     mqttConnection );\r
1857     }\r
1858 \r
1859     IOT_FUNCTION_CLEANUP_END();\r
1860 }\r
1861 \r
1862 /*-----------------------------------------------------------*/\r
1863 \r
1864 IotMqttError_t IotMqtt_PublishSync( IotMqttConnection_t mqttConnection,\r
1865                                     const IotMqttPublishInfo_t * pPublishInfo,\r
1866                                     uint32_t flags,\r
1867                                     uint32_t timeoutMs )\r
1868 {\r
1869     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1870     IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,\r
1871                        * pPublishOperation = NULL;\r
1872 \r
1873     /* Clear the flags, setting only the "serial" flag. */\r
1874     flags = MQTT_INTERNAL_FLAG_BLOCK_ON_SEND;\r
1875 \r
1876     /* Set the waitable flag and reference for QoS 1 PUBLISH. */\r
1877     if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
1878     {\r
1879         flags |= IOT_MQTT_FLAG_WAITABLE;\r
1880         pPublishOperation = &publishOperation;\r
1881     }\r
1882     else\r
1883     {\r
1884         EMPTY_ELSE_MARKER;\r
1885     }\r
1886 \r
1887     /* Call the asynchronous PUBLISH function. */\r
1888     status = IotMqtt_PublishAsync( mqttConnection,\r
1889                                    pPublishInfo,\r
1890                                    flags,\r
1891                                    NULL,\r
1892                                    pPublishOperation );\r
1893 \r
1894     /* Wait for a queued QoS 1 PUBLISH to complete. */\r
1895     if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
1896     {\r
1897         if( status == IOT_MQTT_STATUS_PENDING )\r
1898         {\r
1899             status = IotMqtt_Wait( publishOperation, timeoutMs );\r
1900         }\r
1901         else\r
1902         {\r
1903             EMPTY_ELSE_MARKER;\r
1904         }\r
1905     }\r
1906     else\r
1907     {\r
1908         EMPTY_ELSE_MARKER;\r
1909     }\r
1910 \r
1911     return status;\r
1912 }\r
1913 \r
1914 /*-----------------------------------------------------------*/\r
1915 \r
1916 IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,\r
1917                              uint32_t timeoutMs )\r
1918 {\r
1919     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
1920     _mqttConnection_t * pMqttConnection = NULL;\r
1921 \r
1922     /* Check that IotMqtt_Init was called. */\r
1923     if( _checkInit() == false )\r
1924     {\r
1925         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED );\r
1926     }\r
1927     else\r
1928     {\r
1929         EMPTY_ELSE_MARKER;\r
1930     }\r
1931 \r
1932     /* Validate the given operation reference. */\r
1933     if( _IotMqtt_ValidateOperation( operation ) == false )\r
1934     {\r
1935         status = IOT_MQTT_BAD_PARAMETER;\r
1936     }\r
1937     else\r
1938     {\r
1939         EMPTY_ELSE_MARKER;\r
1940     }\r
1941 \r
1942     /* Check the MQTT connection status. */\r
1943     pMqttConnection = operation->pMqttConnection;\r
1944 \r
1945     if( status == IOT_MQTT_SUCCESS )\r
1946     {\r
1947         IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1948 \r
1949         if( pMqttConnection->disconnected == true )\r
1950         {\r
1951             IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "\r
1952                          "Operation cannot be waited on.",\r
1953                          pMqttConnection,\r
1954                          IotMqtt_OperationType( operation->u.operation.type ),\r
1955                          operation );\r
1956 \r
1957             status = IOT_MQTT_NETWORK_ERROR;\r
1958         }\r
1959         else\r
1960         {\r
1961             IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",\r
1962                         pMqttConnection,\r
1963                         IotMqtt_OperationType( operation->u.operation.type ),\r
1964                         operation );\r
1965         }\r
1966 \r
1967         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1968 \r
1969         /* Only wait on an operation if the MQTT connection is active. */\r
1970         if( status == IOT_MQTT_SUCCESS )\r
1971         {\r
1972             if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),\r
1973                                         timeoutMs ) == false )\r
1974             {\r
1975                 status = IOT_MQTT_TIMEOUT;\r
1976 \r
1977                 /* Attempt to cancel the job of the timed out operation. */\r
1978                 ( void ) _IotMqtt_DecrementOperationReferences( operation, true );\r
1979 \r
1980                 /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */\r
1981                 if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
1982                 {\r
1983                     IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"\r
1984                                  " subscriptions of timed-out SUBSCRIBE.",\r
1985                                  pMqttConnection,\r
1986                                  operation );\r
1987 \r
1988                     _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,\r
1989                                                          operation->u.operation.packetIdentifier,\r
1990                                                          MQTT_REMOVE_ALL_SUBSCRIPTIONS );\r
1991                 }\r
1992                 else\r
1993                 {\r
1994                     EMPTY_ELSE_MARKER;\r
1995                 }\r
1996             }\r
1997             else\r
1998             {\r
1999                 /* Retrieve the status of the completed operation. */\r
2000                 status = operation->u.operation.status;\r
2001             }\r
2002 \r
2003             IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",\r
2004                         pMqttConnection,\r
2005                         IotMqtt_OperationType( operation->u.operation.type ),\r
2006                         operation,\r
2007                         IotMqtt_strerror( status ) );\r
2008         }\r
2009         else\r
2010         {\r
2011             EMPTY_ELSE_MARKER;\r
2012         }\r
2013 \r
2014         /* Wait is finished; decrement operation reference count. */\r
2015         if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )\r
2016         {\r
2017             _IotMqtt_DestroyOperation( operation );\r
2018         }\r
2019         else\r
2020         {\r
2021             EMPTY_ELSE_MARKER;\r
2022         }\r
2023     }\r
2024     else\r
2025     {\r
2026         EMPTY_ELSE_MARKER;\r
2027     }\r
2028 \r
2029     IOT_FUNCTION_EXIT_NO_CLEANUP();\r
2030 }\r
2031 \r
2032 /*-----------------------------------------------------------*/\r
2033 \r
2034 const char * IotMqtt_strerror( IotMqttError_t status )\r
2035 {\r
2036     const char * pMessage = NULL;\r
2037 \r
2038     switch( status )\r
2039     {\r
2040         case IOT_MQTT_SUCCESS:\r
2041             pMessage = "SUCCESS";\r
2042             break;\r
2043 \r
2044         case IOT_MQTT_STATUS_PENDING:\r
2045             pMessage = "PENDING";\r
2046             break;\r
2047 \r
2048         case IOT_MQTT_INIT_FAILED:\r
2049             pMessage = "INITIALIZATION FAILED";\r
2050             break;\r
2051 \r
2052         case IOT_MQTT_BAD_PARAMETER:\r
2053             pMessage = "BAD PARAMETER";\r
2054             break;\r
2055 \r
2056         case IOT_MQTT_NO_MEMORY:\r
2057             pMessage = "NO MEMORY";\r
2058             break;\r
2059 \r
2060         case IOT_MQTT_NETWORK_ERROR:\r
2061             pMessage = "NETWORK ERROR";\r
2062             break;\r
2063 \r
2064         case IOT_MQTT_SCHEDULING_ERROR:\r
2065             pMessage = "SCHEDULING ERROR";\r
2066             break;\r
2067 \r
2068         case IOT_MQTT_BAD_RESPONSE:\r
2069             pMessage = "BAD RESPONSE RECEIVED";\r
2070             break;\r
2071 \r
2072         case IOT_MQTT_TIMEOUT:\r
2073             pMessage = "TIMEOUT";\r
2074             break;\r
2075 \r
2076         case IOT_MQTT_SERVER_REFUSED:\r
2077             pMessage = "SERVER REFUSED";\r
2078             break;\r
2079 \r
2080         case IOT_MQTT_RETRY_NO_RESPONSE:\r
2081             pMessage = "NO RESPONSE";\r
2082             break;\r
2083 \r
2084         case IOT_MQTT_NOT_INITIALIZED:\r
2085             pMessage = "NOT INITIALIZED";\r
2086             break;\r
2087 \r
2088         default:\r
2089             pMessage = "INVALID STATUS";\r
2090             break;\r
2091     }\r
2092 \r
2093     return pMessage;\r
2094 }\r
2095 \r
2096 /*-----------------------------------------------------------*/\r
2097 \r
2098 const char * IotMqtt_OperationType( IotMqttOperationType_t operation )\r
2099 {\r
2100     const char * pMessage = NULL;\r
2101 \r
2102     switch( operation )\r
2103     {\r
2104         case IOT_MQTT_CONNECT:\r
2105             pMessage = "CONNECT";\r
2106             break;\r
2107 \r
2108         case IOT_MQTT_PUBLISH_TO_SERVER:\r
2109             pMessage = "PUBLISH";\r
2110             break;\r
2111 \r
2112         case IOT_MQTT_PUBACK:\r
2113             pMessage = "PUBACK";\r
2114             break;\r
2115 \r
2116         case IOT_MQTT_SUBSCRIBE:\r
2117             pMessage = "SUBSCRIBE";\r
2118             break;\r
2119 \r
2120         case IOT_MQTT_UNSUBSCRIBE:\r
2121             pMessage = "UNSUBSCRIBE";\r
2122             break;\r
2123 \r
2124         case IOT_MQTT_PINGREQ:\r
2125             pMessage = "PINGREQ";\r
2126             break;\r
2127 \r
2128         case IOT_MQTT_DISCONNECT:\r
2129             pMessage = "DISCONNECT";\r
2130             break;\r
2131 \r
2132         default:\r
2133             pMessage = "INVALID OPERATION";\r
2134             break;\r
2135     }\r
2136 \r
2137     return pMessage;\r
2138 }\r
2139 \r
2140 /*-----------------------------------------------------------*/\r
2141 \r
2142 /* Provide access to internal functions and variables if testing. */\r
2143 #if IOT_BUILD_TESTS == 1\r
2144     #include "iot_test_access_mqtt_api.c"\r
2145 #endif\r