]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
23133f46b78776713c4a39d5f1088ae7e5038fc0
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_api.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_api.c\r
28  * @brief Implements most user-facing functions of the MQTT library.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <string.h>\r
36 \r
37 /* Error handling include. */\r
38 #include "private/iot_error.h"\r
39 \r
40 /* MQTT internal include. */\r
41 #include "private/iot_mqtt_internal.h"\r
42 \r
43 /* Platform layer includes. */\r
44 #include "platform/iot_clock.h"\r
45 #include "platform/iot_threads.h"\r
46 \r
47 /* Validate MQTT configuration settings. */\r
48 #if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1\r
49     #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."\r
50 #endif\r
51 #if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1\r
52     #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."\r
53 #endif\r
54 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1\r
55     #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."\r
56 #endif\r
57 #if IOT_MQTT_RESPONSE_WAIT_MS <= 0\r
58     #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."\r
59 #endif\r
60 #if IOT_MQTT_RETRY_MS_CEILING <= 0\r
61     #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."\r
62 #endif\r
63 \r
64 /*-----------------------------------------------------------*/\r
65 \r
66 /**\r
67  * @brief Set the unsubscribed flag of an MQTT subscription.\r
68  *\r
69  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
70  * @param[in] pMatch Not used.\r
71  *\r
72  * @return Always returns `true`.\r
73  */\r
74 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
75                                               void * pMatch );\r
76 \r
77 /**\r
78  * @brief Destroy an MQTT subscription if its reference count is 0.\r
79  *\r
80  * @param[in] pData The subscription to destroy. This parameter is of type\r
81  * `void*` for compatibility with [free]\r
82  * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
83  */\r
84 static void _mqttSubscription_tryDestroy( void * pData );\r
85 \r
86 /**\r
87  * @brief Decrement the reference count of an MQTT operation and attempt to\r
88  * destroy it.\r
89  *\r
90  * @param[in] pData The operation data to destroy. This parameter is of type\r
91  * `void*` for compatibility with [free]\r
92  * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
93  */\r
94 static void _mqttOperation_tryDestroy( void * pData );\r
95 \r
96 /**\r
97  * @brief Create a keep-alive job for an MQTT connection.\r
98  *\r
99  * @param[in] pNetworkInfo User-provided network information for the new\r
100  * connection.\r
101  * @param[in] keepAliveSeconds User-provided keep-alive interval.\r
102  * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.\r
103  *\r
104  * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
105  */\r
106 static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
107                                  uint16_t keepAliveSeconds,\r
108                                  _mqttConnection_t * pMqttConnection );\r
109 \r
110 /**\r
111  * @brief Creates a new MQTT connection and initializes its members.\r
112  *\r
113  * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.\r
114  * @param[in] pNetworkInfo User-provided network information for the new\r
115  * connection.\r
116  * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.\r
117  *\r
118  * @return Pointer to a newly-created MQTT connection; `NULL` on failure.\r
119  */\r
120 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
121                                                   const IotMqttNetworkInfo_t * pNetworkInfo,\r
122                                                   uint16_t keepAliveSeconds );\r
123 \r
124 /**\r
125  * @brief Destroys the members of an MQTT connection.\r
126  *\r
127  * @param[in] pMqttConnection Which connection to destroy.\r
128  */\r
129 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );\r
130 \r
131 /**\r
132  * @brief The common component of both @ref mqtt_function_subscribe and @ref\r
133  * mqtt_function_unsubscribe.\r
134  *\r
135  * See @ref mqtt_function_subscribe or @ref mqtt_function_unsubscribe for a\r
136  * description of the parameters and return values.\r
137  */\r
138 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
139                                            IotMqttConnection_t mqttConnection,\r
140                                            const IotMqttSubscription_t * pSubscriptionList,\r
141                                            size_t subscriptionCount,\r
142                                            uint32_t flags,\r
143                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
144                                            IotMqttOperation_t * pOperationReference );\r
145 \r
146 /*-----------------------------------------------------------*/\r
147 \r
148 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
149                                               void * pMatch )\r
150 {\r
151     /* Because this function is called from a container function, the given link\r
152      * must never be NULL. */\r
153     IotMqtt_Assert( pSubscriptionLink != NULL );\r
154 \r
155     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
156                                                              pSubscriptionLink,\r
157                                                              link );\r
158 \r
159     /* Silence warnings about unused parameters. */\r
160     ( void ) pMatch;\r
161 \r
162     /* Set the unsubscribed flag. */\r
163     pSubscription->unsubscribed = true;\r
164 \r
165     return true;\r
166 }\r
167 \r
168 /*-----------------------------------------------------------*/\r
169 \r
170 static void _mqttSubscription_tryDestroy( void * pData )\r
171 {\r
172     _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;\r
173 \r
174     /* Reference count must not be negative. */\r
175     IotMqtt_Assert( pSubscription->references >= 0 );\r
176 \r
177     /* Unsubscribed flag should be set. */\r
178     IotMqtt_Assert( pSubscription->unsubscribed == true );\r
179 \r
180     /* Free the subscription if it has no references. */\r
181     if( pSubscription->references == 0 )\r
182     {\r
183         IotMqtt_FreeSubscription( pSubscription );\r
184     }\r
185     else\r
186     {\r
187         EMPTY_ELSE_MARKER;\r
188     }\r
189 }\r
190 \r
191 /*-----------------------------------------------------------*/\r
192 \r
193 static void _mqttOperation_tryDestroy( void * pData )\r
194 {\r
195     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;\r
196     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
197 \r
198     /* Incoming PUBLISH operations may always be freed. */\r
199     if( pOperation->incomingPublish == true )\r
200     {\r
201         /* Cancel the incoming PUBLISH operation's job. */\r
202         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
203                                                 pOperation->job,\r
204                                                 NULL );\r
205 \r
206         /* If the operation's job was not canceled, it must be already executing.\r
207          * Any other return value is invalid. */\r
208         IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
209                         ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
210 \r
211         /* Check if the incoming PUBLISH job was canceled. */\r
212         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
213         {\r
214             /* Job was canceled. Process incoming PUBLISH now to clean up. */\r
215             _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,\r
216                                              pOperation->job,\r
217                                              pOperation );\r
218         }\r
219         else\r
220         {\r
221             /* The executing job will process the PUBLISH, so nothing is done here. */\r
222             EMPTY_ELSE_MARKER;\r
223         }\r
224     }\r
225     else\r
226     {\r
227         /* Decrement reference count and destroy operation if possible. */\r
228         if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )\r
229         {\r
230             _IotMqtt_DestroyOperation( pOperation );\r
231         }\r
232         else\r
233         {\r
234             EMPTY_ELSE_MARKER;\r
235         }\r
236     }\r
237 }\r
238 \r
239 /*-----------------------------------------------------------*/\r
240 \r
241 static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
242                                  uint16_t keepAliveSeconds,\r
243                                  _mqttConnection_t * pMqttConnection )\r
244 {\r
245     bool status = true;\r
246     IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
247     IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;\r
248 \r
249     /* Network information is not used when MQTT packet serializers are disabled. */\r
250     ( void ) pNetworkInfo;\r
251 \r
252     /* Default PINGREQ serializer function. */\r
253     IotMqttError_t ( * serializePingreq )( uint8_t **,\r
254                                            size_t * ) = _IotMqtt_SerializePingreq;\r
255 \r
256     /* Convert the keep-alive interval to milliseconds. */\r
257     pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
258     pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
259 \r
260     /* Choose a PINGREQ serializer function. */\r
261     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
262         if( pNetworkInfo->pMqttSerializer != NULL )\r
263         {\r
264             if( pNetworkInfo->pMqttSerializer->serialize.pingreq != NULL )\r
265             {\r
266                 serializePingreq = pNetworkInfo->pMqttSerializer->serialize.pingreq;\r
267             }\r
268             else\r
269             {\r
270                 EMPTY_ELSE_MARKER;\r
271             }\r
272         }\r
273         else\r
274         {\r
275             EMPTY_ELSE_MARKER;\r
276         }\r
277     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
278 \r
279     /* Generate a PINGREQ packet. */\r
280     serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
281                                         &( pMqttConnection->pingreqPacketSize ) );\r
282 \r
283     if( serializeStatus != IOT_MQTT_SUCCESS )\r
284     {\r
285         IotLogError( "Failed to allocate PINGREQ packet for new connection." );\r
286 \r
287         status = false;\r
288     }\r
289     else\r
290     {\r
291         /* Create the task pool job that processes keep-alive. */\r
292         jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
293                                            pMqttConnection,\r
294                                            &( pMqttConnection->keepAliveJobStorage ),\r
295                                            &( pMqttConnection->keepAliveJob ) );\r
296 \r
297         /* Task pool job creation for a pre-allocated job should never fail.\r
298          * Abort the program if it does. */\r
299         if( jobStatus != IOT_TASKPOOL_SUCCESS )\r
300         {\r
301             IotLogError( "Failed to create keep-alive job for new connection." );\r
302 \r
303             IotMqtt_Assert( false );\r
304         }\r
305         else\r
306         {\r
307             EMPTY_ELSE_MARKER;\r
308         }\r
309 \r
310         /* Keep-alive references its MQTT connection, so increment reference. */\r
311         ( pMqttConnection->references )++;\r
312     }\r
313 \r
314     return status;\r
315 }\r
316 \r
317 /*-----------------------------------------------------------*/\r
318 \r
319 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
320                                                   const IotMqttNetworkInfo_t * pNetworkInfo,\r
321                                                   uint16_t keepAliveSeconds )\r
322 {\r
323     IOT_FUNCTION_ENTRY( bool, true );\r
324     _mqttConnection_t * pMqttConnection = NULL;\r
325     bool referencesMutexCreated = false, subscriptionMutexCreated = false;\r
326 \r
327     /* Allocate memory for the new MQTT connection. */\r
328     pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );\r
329 \r
330     if( pMqttConnection == NULL )\r
331     {\r
332         IotLogError( "Failed to allocate memory for new connection." );\r
333 \r
334         IOT_SET_AND_GOTO_CLEANUP( false );\r
335     }\r
336     else\r
337     {\r
338         /* Clear the MQTT connection, then copy the MQTT server mode, network\r
339          * interface, and disconnect callback. */\r
340         ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );\r
341         pMqttConnection->awsIotMqttMode = awsIotMqttMode;\r
342         pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;\r
343         pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;\r
344 \r
345         /* Start a new MQTT connection with a reference count of 1. */\r
346         pMqttConnection->references = 1;\r
347     }\r
348 \r
349     /* Create the references mutex for a new connection. It is a recursive mutex. */\r
350     referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );\r
351 \r
352     if( referencesMutexCreated == false )\r
353     {\r
354         IotLogError( "Failed to create references mutex for new connection." );\r
355 \r
356         IOT_SET_AND_GOTO_CLEANUP( false );\r
357     }\r
358     else\r
359     {\r
360         EMPTY_ELSE_MARKER;\r
361     }\r
362 \r
363     /* Create the subscription mutex for a new connection. */\r
364     subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );\r
365 \r
366     if( subscriptionMutexCreated == false )\r
367     {\r
368         IotLogError( "Failed to create subscription mutex for new connection." );\r
369 \r
370         IOT_SET_AND_GOTO_CLEANUP( false );\r
371     }\r
372     else\r
373     {\r
374         EMPTY_ELSE_MARKER;\r
375     }\r
376 \r
377     /* Create the new connection's subscription and operation lists. */\r
378     IotListDouble_Create( &( pMqttConnection->subscriptionList ) );\r
379     IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );\r
380     IotListDouble_Create( &( pMqttConnection->pendingResponse ) );\r
381 \r
382     /* AWS IoT service limits set minimum and maximum values for keep-alive interval.\r
383      * Adjust the user-provided keep-alive interval based on these requirements. */\r
384     if( awsIotMqttMode == true )\r
385     {\r
386         if( keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )\r
387         {\r
388             keepAliveSeconds = AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE;\r
389         }\r
390         else if( keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )\r
391         {\r
392             keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
393         }\r
394         else if( keepAliveSeconds == 0 )\r
395         {\r
396             keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
397         }\r
398         else\r
399         {\r
400             EMPTY_ELSE_MARKER;\r
401         }\r
402     }\r
403     else\r
404     {\r
405         EMPTY_ELSE_MARKER;\r
406     }\r
407 \r
408     /* Check if keep-alive is active for this connection. */\r
409     if( keepAliveSeconds != 0 )\r
410     {\r
411         if( _createKeepAliveJob( pNetworkInfo,\r
412                                  keepAliveSeconds,\r
413                                  pMqttConnection ) == false )\r
414         {\r
415             IOT_SET_AND_GOTO_CLEANUP( false );\r
416         }\r
417         else\r
418         {\r
419             EMPTY_ELSE_MARKER;\r
420         }\r
421     }\r
422     else\r
423     {\r
424         EMPTY_ELSE_MARKER;\r
425     }\r
426 \r
427     /* Clean up mutexes and connection if this function failed. */\r
428     IOT_FUNCTION_CLEANUP_BEGIN();\r
429 \r
430     if( status == false )\r
431     {\r
432         if( subscriptionMutexCreated == true )\r
433         {\r
434             IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
435         }\r
436         else\r
437         {\r
438             EMPTY_ELSE_MARKER;\r
439         }\r
440 \r
441         if( referencesMutexCreated == true )\r
442         {\r
443             IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
444         }\r
445         else\r
446         {\r
447             EMPTY_ELSE_MARKER;\r
448         }\r
449 \r
450         if( pMqttConnection != NULL )\r
451         {\r
452             IotMqtt_FreeConnection( pMqttConnection );\r
453             pMqttConnection = NULL;\r
454         }\r
455         else\r
456         {\r
457             EMPTY_ELSE_MARKER;\r
458         }\r
459     }\r
460     else\r
461     {\r
462         EMPTY_ELSE_MARKER;\r
463     }\r
464 \r
465     return pMqttConnection;\r
466 }\r
467 \r
468 /*-----------------------------------------------------------*/\r
469 \r
470 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )\r
471 {\r
472     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
473 \r
474     /* Clean up keep-alive if still allocated. */\r
475     if( pMqttConnection->keepAliveMs != 0 )\r
476     {\r
477         IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
478 \r
479         _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
480 \r
481         /* Clear data about the keep-alive. */\r
482         pMqttConnection->keepAliveMs = 0;\r
483         pMqttConnection->pPingreqPacket = NULL;\r
484         pMqttConnection->pingreqPacketSize = 0;\r
485 \r
486         /* Decrement reference count. */\r
487         pMqttConnection->references--;\r
488     }\r
489     else\r
490     {\r
491         EMPTY_ELSE_MARKER;\r
492     }\r
493 \r
494     /* A connection to be destroyed should have no keep-alive and at most 1\r
495      * reference. */\r
496     IotMqtt_Assert( pMqttConnection->references <= 1 );\r
497     IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
498     IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
499     IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
500 \r
501     /* Remove all subscriptions. */\r
502     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
503     IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
504                                     _mqttSubscription_setUnsubscribe,\r
505                                     NULL,\r
506                                     _mqttSubscription_tryDestroy,\r
507                                     offsetof( _mqttSubscription_t, link ) );\r
508     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
509 \r
510     /* Destroy an owned network connection. */\r
511     if( pMqttConnection->ownNetworkConnection == true )\r
512     {\r
513         networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );\r
514 \r
515         if( networkStatus != IOT_NETWORK_SUCCESS )\r
516         {\r
517             IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",\r
518                         pMqttConnection );\r
519         }\r
520         else\r
521         {\r
522             IotLogInfo( "(MQTT connection %p) Network connection destroyed.",\r
523                         pMqttConnection );\r
524         }\r
525     }\r
526     else\r
527     {\r
528         EMPTY_ELSE_MARKER;\r
529     }\r
530 \r
531     /* Destroy mutexes. */\r
532     IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
533     IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
534 \r
535     IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );\r
536 \r
537     /* Free connection. */\r
538     IotMqtt_FreeConnection( pMqttConnection );\r
539 }\r
540 \r
541 /*-----------------------------------------------------------*/\r
542 \r
543 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
544                                            IotMqttConnection_t mqttConnection,\r
545                                            const IotMqttSubscription_t * pSubscriptionList,\r
546                                            size_t subscriptionCount,\r
547                                            uint32_t flags,\r
548                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
549                                            IotMqttOperation_t * pOperationReference )\r
550 {\r
551     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
552     _mqttOperation_t * pSubscriptionOperation = NULL;\r
553 \r
554     /* Subscription serializer function. */\r
555     IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,\r
556                                                 size_t,\r
557                                                 uint8_t **,\r
558                                                 size_t *,\r
559                                                 uint16_t * ) = NULL;\r
560 \r
561     /* This function should only be called for subscribe or unsubscribe. */\r
562     IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||\r
563                     ( operation == IOT_MQTT_UNSUBSCRIBE ) );\r
564 \r
565     /* Check that all elements in the subscription list are valid. */\r
566     if( _IotMqtt_ValidateSubscriptionList( operation,\r
567                                            mqttConnection->awsIotMqttMode,\r
568                                            pSubscriptionList,\r
569                                            subscriptionCount ) == false )\r
570     {\r
571         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
572     }\r
573     else\r
574     {\r
575         EMPTY_ELSE_MARKER;\r
576     }\r
577 \r
578     /* Check that a reference pointer is provided for a waitable operation. */\r
579     if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
580     {\r
581         if( pOperationReference == NULL )\r
582         {\r
583             IotLogError( "Reference must be provided for a waitable %s.",\r
584                          IotMqtt_OperationType( operation ) );\r
585 \r
586             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
587         }\r
588         else\r
589         {\r
590             EMPTY_ELSE_MARKER;\r
591         }\r
592     }\r
593     else\r
594     {\r
595         EMPTY_ELSE_MARKER;\r
596     }\r
597 \r
598     /* Choose a subscription serialize function. */\r
599     if( operation == IOT_MQTT_SUBSCRIBE )\r
600     {\r
601         serializeSubscription = _IotMqtt_SerializeSubscribe;\r
602 \r
603         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
604             if( mqttConnection->pSerializer != NULL )\r
605             {\r
606                 if( mqttConnection->pSerializer->serialize.subscribe != NULL )\r
607                 {\r
608                     serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;\r
609                 }\r
610                 else\r
611                 {\r
612                     EMPTY_ELSE_MARKER;\r
613                 }\r
614             }\r
615             else\r
616             {\r
617                 EMPTY_ELSE_MARKER;\r
618             }\r
619         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
620     }\r
621     else\r
622     {\r
623         serializeSubscription = _IotMqtt_SerializeUnsubscribe;\r
624 \r
625         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
626             if( mqttConnection->pSerializer != NULL )\r
627             {\r
628                 if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )\r
629                 {\r
630                     serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;\r
631                 }\r
632                 else\r
633                 {\r
634                     EMPTY_ELSE_MARKER;\r
635                 }\r
636             }\r
637             else\r
638             {\r
639                 EMPTY_ELSE_MARKER;\r
640             }\r
641         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
642     }\r
643 \r
644     /* Remove the MQTT subscription list for an UNSUBSCRIBE. */\r
645     if( operation == IOT_MQTT_UNSUBSCRIBE )\r
646     {\r
647         _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,\r
648                                                   pSubscriptionList,\r
649                                                   subscriptionCount );\r
650     }\r
651     else\r
652     {\r
653         EMPTY_ELSE_MARKER;\r
654     }\r
655 \r
656     /* Create a subscription operation. */\r
657     status = _IotMqtt_CreateOperation( mqttConnection,\r
658                                        flags,\r
659                                        pCallbackInfo,\r
660                                        &pSubscriptionOperation );\r
661 \r
662     if( status != IOT_MQTT_SUCCESS )\r
663     {\r
664         IOT_GOTO_CLEANUP();\r
665     }\r
666 \r
667     /* Check the subscription operation data and set the operation type. */\r
668     IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
669     IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
670     pSubscriptionOperation->u.operation.type = operation;\r
671 \r
672     /* Generate a subscription packet from the subscription list. */\r
673     status = serializeSubscription( pSubscriptionList,\r
674                                     subscriptionCount,\r
675                                     &( pSubscriptionOperation->u.operation.pMqttPacket ),\r
676                                     &( pSubscriptionOperation->u.operation.packetSize ),\r
677                                     &( pSubscriptionOperation->u.operation.packetIdentifier ) );\r
678 \r
679     if( status != IOT_MQTT_SUCCESS )\r
680     {\r
681         IOT_GOTO_CLEANUP();\r
682     }\r
683 \r
684     /* Check the serialized MQTT packet. */\r
685     IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );\r
686     IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );\r
687 \r
688     /* Add the subscription list for a SUBSCRIBE. */\r
689     if( operation == IOT_MQTT_SUBSCRIBE )\r
690     {\r
691         status = _IotMqtt_AddSubscriptions( mqttConnection,\r
692                                             pSubscriptionOperation->u.operation.packetIdentifier,\r
693                                             pSubscriptionList,\r
694                                             subscriptionCount );\r
695 \r
696         if( status != IOT_MQTT_SUCCESS )\r
697         {\r
698             IOT_GOTO_CLEANUP();\r
699         }\r
700     }\r
701 \r
702     /* Set the reference, if provided. */\r
703     if( pOperationReference != NULL )\r
704     {\r
705         *pOperationReference = pSubscriptionOperation;\r
706     }\r
707 \r
708     /* Schedule the subscription operation for network transmission. */\r
709     status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,\r
710                                          _IotMqtt_ProcessSend,\r
711                                          0 );\r
712 \r
713     if( status != IOT_MQTT_SUCCESS )\r
714     {\r
715         IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",\r
716                      mqttConnection,\r
717                      IotMqtt_OperationType( operation ) );\r
718 \r
719         if( operation == IOT_MQTT_SUBSCRIBE )\r
720         {\r
721             _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,\r
722                                                  pSubscriptionOperation->u.operation.packetIdentifier,\r
723                                                  -1 );\r
724         }\r
725 \r
726         /* Clear the previously set (and now invalid) reference. */\r
727         if( pOperationReference != NULL )\r
728         {\r
729             *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;\r
730         }\r
731 \r
732         IOT_GOTO_CLEANUP();\r
733     }\r
734 \r
735     /* Clean up if this function failed. */\r
736     IOT_FUNCTION_CLEANUP_BEGIN();\r
737 \r
738     if( status != IOT_MQTT_SUCCESS )\r
739     {\r
740         if( pSubscriptionOperation != NULL )\r
741         {\r
742             _IotMqtt_DestroyOperation( pSubscriptionOperation );\r
743         }\r
744     }\r
745     else\r
746     {\r
747         status = IOT_MQTT_STATUS_PENDING;\r
748 \r
749         IotLogInfo( "(MQTT connection %p) %s operation scheduled.",\r
750                     mqttConnection,\r
751                     IotMqtt_OperationType( operation ) );\r
752     }\r
753 \r
754     IOT_FUNCTION_CLEANUP_END();\r
755 }\r
756 \r
757 /*-----------------------------------------------------------*/\r
758 \r
759 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
760 {\r
761     bool disconnected = false;\r
762 \r
763     /* Lock the mutex protecting the reference count. */\r
764     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
765 \r
766     /* Reference count must not be negative. */\r
767     IotMqtt_Assert( pMqttConnection->references >= 0 );\r
768 \r
769     /* Read connection status. */\r
770     disconnected = pMqttConnection->disconnected;\r
771 \r
772     /* Increment the connection's reference count if it is not disconnected. */\r
773     if( disconnected == false )\r
774     {\r
775         ( pMqttConnection->references )++;\r
776         IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
777                      pMqttConnection,\r
778                      ( long int ) pMqttConnection->references - 1,\r
779                      ( long int ) pMqttConnection->references );\r
780     }\r
781     else\r
782     {\r
783         IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );\r
784     }\r
785 \r
786     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
787 \r
788     return( disconnected == false );\r
789 }\r
790 \r
791 /*-----------------------------------------------------------*/\r
792 \r
793 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
794 {\r
795     bool destroyConnection = false;\r
796 \r
797     /* Lock the mutex protecting the reference count. */\r
798     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
799 \r
800     /* Decrement reference count. It must not be negative. */\r
801     ( pMqttConnection->references )--;\r
802     IotMqtt_Assert( pMqttConnection->references >= 0 );\r
803 \r
804     IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
805                  pMqttConnection,\r
806                  ( long int ) pMqttConnection->references + 1,\r
807                  ( long int ) pMqttConnection->references );\r
808 \r
809     /* Check if this connection may be destroyed. */\r
810     if( pMqttConnection->references == 0 )\r
811     {\r
812         destroyConnection = true;\r
813     }\r
814     else\r
815     {\r
816         EMPTY_ELSE_MARKER;\r
817     }\r
818 \r
819     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
820 \r
821     /* Destroy an unreferenced MQTT connection. */\r
822     if( destroyConnection == true )\r
823     {\r
824         IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",\r
825                      pMqttConnection );\r
826         _destroyMqttConnection( pMqttConnection );\r
827     }\r
828     else\r
829     {\r
830         EMPTY_ELSE_MARKER;\r
831     }\r
832 }\r
833 \r
834 /*-----------------------------------------------------------*/\r
835 \r
836 IotMqttError_t IotMqtt_Init( void )\r
837 {\r
838     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
839 \r
840     /* Call any additional serializer initialization function if serializer\r
841      * overrides are enabled. */\r
842     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
843         #ifdef _IotMqtt_InitSerializeAdditional\r
844             if( _IotMqtt_InitSerializeAdditional() == false )\r
845             {\r
846                 status = IOT_MQTT_INIT_FAILED;\r
847             }\r
848             else\r
849             {\r
850                 EMPTY_ELSE_MARKER;\r
851             }\r
852         #endif\r
853     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
854 \r
855     /* Log initialization status. */\r
856     if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0\r
857     {\r
858         IotLogError( "Failed to initialize MQTT library serializer. " );\r
859     }\r
860     else\r
861     {\r
862         IotLogInfo( "MQTT library successfully initialized." );\r
863     }\r
864 \r
865     return status;\r
866 }\r
867 \r
868 /*-----------------------------------------------------------*/\r
869 \r
870 void IotMqtt_Cleanup( void )\r
871 {\r
872     /* Call any additional serializer cleanup initialization function if serializer\r
873      * overrides are enabled. */\r
874     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
875         #ifdef _IotMqtt_CleanupSerializeAdditional\r
876             _IotMqtt_CleanupSerializeAdditional();\r
877         #endif\r
878     #endif\r
879 \r
880     IotLogInfo( "MQTT library cleanup done." );\r
881 }\r
882 \r
883 /*-----------------------------------------------------------*/\r
884 \r
885 IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,\r
886                                 const IotMqttConnectInfo_t * pConnectInfo,\r
887                                 uint32_t timeoutMs,\r
888                                 IotMqttConnection_t * const pMqttConnection )\r
889 {\r
890     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
891     bool networkCreated = false, ownNetworkConnection = false;\r
892     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
893     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
894     void * pNetworkConnection = NULL;\r
895     _mqttOperation_t * pOperation = NULL;\r
896     _mqttConnection_t * pNewMqttConnection = NULL;\r
897 \r
898     /* Default CONNECT serializer function. */\r
899     IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.\r
900                                            uint8_t **,\r
901                                            size_t * ) = _IotMqtt_SerializeConnect;\r
902 \r
903     /* Network info must not be NULL. */\r
904     if( pNetworkInfo == NULL )\r
905     {\r
906         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
907     }\r
908     else\r
909     {\r
910         EMPTY_ELSE_MARKER;\r
911     }\r
912 \r
913     /* Validate network interface and connect info. */\r
914     if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().\r
915     {\r
916         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
917     }\r
918     else\r
919     {\r
920         EMPTY_ELSE_MARKER;\r
921     }\r
922 \r
923     /* If will info is provided, check that it is valid. */\r
924     if( pConnectInfo->pWillInfo != NULL )\r
925     {\r
926         if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,\r
927                                       pConnectInfo->pWillInfo ) == false )\r
928         {\r
929             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
930         }\r
931         else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )\r
932         {\r
933             /* Will message payloads cannot be larger than 65535. This restriction\r
934              * applies only to will messages, and not normal PUBLISH messages. */\r
935             IotLogError( "Will payload cannot be larger than 65535." );\r
936 \r
937             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
938         }\r
939         else\r
940         {\r
941             EMPTY_ELSE_MARKER;\r
942         }\r
943     }\r
944     else\r
945     {\r
946         EMPTY_ELSE_MARKER;\r
947     }\r
948 \r
949     /* If previous subscriptions are provided, check that they are valid. */\r
950     if( pConnectInfo->cleanSession == false )\r
951     {\r
952         if( pConnectInfo->pPreviousSubscriptions != NULL )\r
953         {\r
954             if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,\r
955                                                    pConnectInfo->awsIotMqttMode,\r
956                                                    pConnectInfo->pPreviousSubscriptions,\r
957                                                    pConnectInfo->previousSubscriptionCount ) == false )\r
958             {\r
959                 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
960             }\r
961             else\r
962             {\r
963                 EMPTY_ELSE_MARKER;\r
964             }\r
965         }\r
966         else\r
967         {\r
968             EMPTY_ELSE_MARKER;\r
969         }\r
970     }\r
971     else\r
972     {\r
973         EMPTY_ELSE_MARKER;\r
974     }\r
975 \r
976     /* Create a new MQTT connection if requested. Otherwise, copy the existing\r
977      * network connection. */\r
978     if( pNetworkInfo->createNetworkConnection == true )\r
979     {\r
980         networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,\r
981                                                                  pNetworkInfo->u.setup.pNetworkCredentialInfo,\r
982                                                                  &pNetworkConnection );\r
983 \r
984         if( networkStatus == IOT_NETWORK_SUCCESS )\r
985         {\r
986             networkCreated = true;\r
987 \r
988             /* This MQTT connection owns the network connection it created and\r
989              * should destroy it on cleanup. */\r
990             ownNetworkConnection = true;\r
991         }\r
992         else\r
993         {\r
994             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
995         }\r
996     }\r
997     else\r
998     {\r
999         pNetworkConnection = pNetworkInfo->u.pNetworkConnection;\r
1000         networkCreated = true;\r
1001     }\r
1002 \r
1003     IotLogInfo( "Establishing new MQTT connection." );\r
1004 \r
1005     /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name?  I don't think this does create a connection as that happens below.\r
1006     pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
1007                                                 pNetworkInfo,\r
1008                                                 pConnectInfo->keepAliveSeconds );\r
1009 \r
1010     if( pNewMqttConnection == NULL )\r
1011     {\r
1012         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
1013     }\r
1014     else\r
1015     {\r
1016         /* Set the network connection associated with the MQTT connection. */\r
1017         pNewMqttConnection->pNetworkConnection = pNetworkConnection;\r
1018         pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;\r
1019 \r
1020         /* Set the MQTT packet serializer overrides. */\r
1021         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1022             pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;\r
1023         #endif\r
1024     }\r
1025 \r
1026     /* Set the MQTT receive callback. */\r
1027     networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,\r
1028                                                                                IotMqtt_ReceiveCallback,\r
1029                                                                                pNewMqttConnection );\r
1030 \r
1031     if( networkStatus != IOT_NETWORK_SUCCESS )\r
1032     {\r
1033         IotLogError( "Failed to set MQTT network receive callback." );\r
1034 \r
1035         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
1036     }\r
1037     else\r
1038     {\r
1039         EMPTY_ELSE_MARKER;\r
1040     }\r
1041 \r
1042     /* Create a CONNECT operation. */\r
1043     status = _IotMqtt_CreateOperation( pNewMqttConnection,\r
1044                                        IOT_MQTT_FLAG_WAITABLE,\r
1045                                        NULL,\r
1046                                        &pOperation );\r
1047 \r
1048     if( status != IOT_MQTT_SUCCESS )\r
1049     {\r
1050         IOT_GOTO_CLEANUP();\r
1051     }\r
1052     else\r
1053     {\r
1054         EMPTY_ELSE_MARKER;\r
1055     }\r
1056 \r
1057     /* Ensure the members set by operation creation and serialization\r
1058      * are appropriate for a blocking CONNECT. */\r
1059     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1060     IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
1061                     == IOT_MQTT_FLAG_WAITABLE );\r
1062     IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
1063 \r
1064     /* Set the operation type. */\r
1065     pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
1066 \r
1067     /* Add previous session subscriptions. */\r
1068     if( pConnectInfo->pPreviousSubscriptions != NULL )\r
1069     {\r
1070         /* Previous subscription count should have been validated as nonzero. */\r
1071         IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );\r
1072 \r
1073         status = _IotMqtt_AddSubscriptions( pNewMqttConnection,\r
1074                                             2,\r
1075                                             pConnectInfo->pPreviousSubscriptions,\r
1076                                             pConnectInfo->previousSubscriptionCount );\r
1077 \r
1078         if( status != IOT_MQTT_SUCCESS )\r
1079         {\r
1080             IOT_GOTO_CLEANUP();\r
1081         }\r
1082         else\r
1083         {\r
1084             EMPTY_ELSE_MARKER;\r
1085         }\r
1086     }\r
1087     else\r
1088     {\r
1089         EMPTY_ELSE_MARKER;\r
1090     }\r
1091 \r
1092     /* Choose a CONNECT serializer function. */\r
1093     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1094         if( pNewMqttConnection->pSerializer != NULL )\r
1095         {\r
1096             if( pNewMqttConnection->pSerializer->serialize.connect != NULL )\r
1097             {\r
1098                 serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;\r
1099             }\r
1100             else\r
1101             {\r
1102                 EMPTY_ELSE_MARKER;\r
1103             }\r
1104         }\r
1105         else\r
1106         {\r
1107             EMPTY_ELSE_MARKER;\r
1108         }\r
1109     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1110 \r
1111     /* Convert the connect info and will info objects to an MQTT CONNECT packet. */\r
1112     status = serializeConnect( pConnectInfo,\r
1113                                &( pOperation->u.operation.pMqttPacket ),\r
1114                                &( pOperation->u.operation.packetSize ) );\r
1115 \r
1116     if( status != IOT_MQTT_SUCCESS )\r
1117     {\r
1118         IOT_GOTO_CLEANUP();\r
1119     }\r
1120     else\r
1121     {\r
1122         EMPTY_ELSE_MARKER;\r
1123     }\r
1124 \r
1125     /* Check the serialized MQTT packet. */\r
1126     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1127     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1128 \r
1129     /* Add the CONNECT operation to the send queue for network transmission. */\r
1130     status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?\r
1131                                          _IotMqtt_ProcessSend,\r
1132                                          0 );\r
1133 \r
1134     if( status != IOT_MQTT_SUCCESS )\r
1135     {\r
1136         IotLogError( "Failed to enqueue CONNECT for sending." );\r
1137     }\r
1138     else\r
1139     {\r
1140         /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */\r
1141         status = IotMqtt_Wait( pOperation,\r
1142                                timeoutMs );\r
1143 \r
1144         /* The call to wait cleans up the CONNECT operation, so set the pointer\r
1145          * to NULL. */\r
1146         pOperation = NULL;\r
1147     }\r
1148 \r
1149     /* When a connection is successfully established, schedule keep-alive job. */\r
1150     if( status == IOT_MQTT_SUCCESS )\r
1151     {\r
1152         /* Check if a keep-alive job should be scheduled. */\r
1153         if( pNewMqttConnection->keepAliveMs != 0 )\r
1154         {\r
1155             IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
1156 \r
1157             taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
1158                                                            pNewMqttConnection->keepAliveJob,\r
1159                                                            pNewMqttConnection->nextKeepAliveMs );\r
1160 \r
1161             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1162             {\r
1163                 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );\r
1164             }\r
1165             else\r
1166             {\r
1167                 EMPTY_ELSE_MARKER;\r
1168             }\r
1169         }\r
1170         else\r
1171         {\r
1172             EMPTY_ELSE_MARKER;\r
1173         }\r
1174     }\r
1175     else\r
1176     {\r
1177         EMPTY_ELSE_MARKER;\r
1178     }\r
1179 \r
1180     IOT_FUNCTION_CLEANUP_BEGIN();\r
1181 \r
1182     if( status != IOT_MQTT_SUCCESS )\r
1183     {\r
1184         IotLogError( "Failed to establish new MQTT connection, error %s.",\r
1185                      IotMqtt_strerror( status ) );\r
1186 \r
1187         /* The network connection must be closed if it was created. */\r
1188         if( networkCreated == true )\r
1189         {\r
1190             networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );\r
1191 \r
1192             if( networkStatus != IOT_NETWORK_SUCCESS )\r
1193             {\r
1194                 IotLogWarn( "Failed to close network connection." );\r
1195             }\r
1196             else\r
1197             {\r
1198                 IotLogInfo( "Network connection closed on error." );\r
1199             }\r
1200         }\r
1201         else\r
1202         {\r
1203             EMPTY_ELSE_MARKER;\r
1204         }\r
1205 \r
1206         if( pOperation != NULL )\r
1207         {\r
1208             _IotMqtt_DestroyOperation( pOperation );\r
1209         }\r
1210         else\r
1211         {\r
1212             EMPTY_ELSE_MARKER;\r
1213         }\r
1214 \r
1215         if( pNewMqttConnection != NULL )\r
1216         {\r
1217             _destroyMqttConnection( pNewMqttConnection );\r
1218         }\r
1219         else\r
1220         {\r
1221             EMPTY_ELSE_MARKER;\r
1222         }\r
1223     }\r
1224     else\r
1225     {\r
1226         IotLogInfo( "New MQTT connection %p established.", pMqttConnection );\r
1227 \r
1228         /* Set the output parameter. */\r
1229         *pMqttConnection = pNewMqttConnection;\r
1230     }\r
1231 \r
1232     IOT_FUNCTION_CLEANUP_END();\r
1233 }\r
1234 \r
1235 /*-----------------------------------------------------------*/\r
1236 \r
1237 void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,\r
1238                          uint32_t flags )\r
1239 {\r
1240     bool disconnected = false;\r
1241     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1242     _mqttOperation_t * pOperation = NULL;\r
1243 \r
1244     IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );\r
1245 \r
1246     /* Read the connection status. */\r
1247     IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
1248     disconnected = mqttConnection->disconnected;\r
1249     IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
1250 \r
1251     /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"\r
1252      * flag is not set. */\r
1253     if( disconnected == false )\r
1254     {\r
1255         if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == 0 )\r
1256         {\r
1257             /* Create a DISCONNECT operation. This function blocks until the DISCONNECT\r
1258              * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */\r
1259             status = _IotMqtt_CreateOperation( mqttConnection,\r
1260                                                IOT_MQTT_FLAG_WAITABLE,\r
1261                                                NULL,\r
1262                                                &pOperation );\r
1263 \r
1264             if( status == IOT_MQTT_SUCCESS )\r
1265             {\r
1266                 /* Ensure that the members set by operation creation and serialization\r
1267                  * are appropriate for a blocking DISCONNECT. */\r
1268                 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1269                 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
1270                                 == IOT_MQTT_FLAG_WAITABLE );\r
1271                 IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
1272 \r
1273                 /* Set the operation type. */\r
1274                 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
1275 \r
1276                 /* Choose a disconnect serializer. */\r
1277                 IotMqttError_t ( * serializeDisconnect )( uint8_t **,\r
1278                                                           size_t * ) = _IotMqtt_SerializeDisconnect;\r
1279 \r
1280                 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1281                     if( mqttConnection->pSerializer != NULL )\r
1282                     {\r
1283                         if( mqttConnection->pSerializer->serialize.disconnect != NULL )\r
1284                         {\r
1285                             serializeDisconnect = mqttConnection->pSerializer->serialize.disconnect;\r
1286                         }\r
1287                         else\r
1288                         {\r
1289                             EMPTY_ELSE_MARKER;\r
1290                         }\r
1291                     }\r
1292                     else\r
1293                     {\r
1294                         EMPTY_ELSE_MARKER;\r
1295                     }\r
1296                 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1297 \r
1298                 /* Generate a DISCONNECT packet. */\r
1299                 status = serializeDisconnect( &( pOperation->u.operation.pMqttPacket ),\r
1300                                               &( pOperation->u.operation.packetSize ) );\r
1301             }\r
1302             else\r
1303             {\r
1304                 EMPTY_ELSE_MARKER;\r
1305             }\r
1306 \r
1307             if( status == IOT_MQTT_SUCCESS )\r
1308             {\r
1309                 /* Check the serialized MQTT packet. */\r
1310                 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1311                 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1312 \r
1313                 /* Schedule the DISCONNECT operation for network transmission. */\r
1314                 if( _IotMqtt_ScheduleOperation( pOperation,\r
1315                                                 _IotMqtt_ProcessSend,\r
1316                                                 0 ) != IOT_MQTT_SUCCESS )\r
1317                 {\r
1318                     IotLogWarn( "(MQTT connection %p) Failed to schedule DISCONNECT for sending.",\r
1319                                 mqttConnection );\r
1320                     _IotMqtt_DestroyOperation( pOperation );\r
1321                 }\r
1322                 else\r
1323                 {\r
1324                     /* Wait a short time for the DISCONNECT packet to be transmitted. */\r
1325                     status = IotMqtt_Wait( pOperation,\r
1326                                            IOT_MQTT_RESPONSE_WAIT_MS );\r
1327 \r
1328                     /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,\r
1329                      * or NETWORK ERROR. */\r
1330                     if( status == IOT_MQTT_SUCCESS )\r
1331                     {\r
1332                         IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );\r
1333                     }\r
1334                     else\r
1335                     {\r
1336                         IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||\r
1337                                         ( status == IOT_MQTT_NETWORK_ERROR ) );\r
1338 \r
1339                         IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",\r
1340                                     mqttConnection,\r
1341                                     IotMqtt_strerror( status ) );\r
1342                     }\r
1343                 }\r
1344             }\r
1345             else\r
1346             {\r
1347                 EMPTY_ELSE_MARKER;\r
1348             }\r
1349         }\r
1350         else\r
1351         {\r
1352             EMPTY_ELSE_MARKER;\r
1353         }\r
1354     }\r
1355     else\r
1356     {\r
1357         EMPTY_ELSE_MARKER;\r
1358     }\r
1359 \r
1360     /* Close the underlying network connection. This also cleans up keep-alive. */\r
1361     _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,\r
1362                                      mqttConnection );\r
1363 \r
1364     /* Check if the connection may be destroyed. */\r
1365     IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
1366 \r
1367     /* At this point, the connection should be marked disconnected. */\r
1368     IotMqtt_Assert( mqttConnection->disconnected == true );\r
1369 \r
1370     /* Attempt cancel and destroy each operation in the connection's lists. */\r
1371     IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),\r
1372                              _mqttOperation_tryDestroy,\r
1373                              offsetof( _mqttOperation_t, link ) );\r
1374 \r
1375     IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),\r
1376                              _mqttOperation_tryDestroy,\r
1377                              offsetof( _mqttOperation_t, link ) );\r
1378 \r
1379     IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
1380 \r
1381     /* Decrement the connection reference count and destroy it if possible. */\r
1382     _IotMqtt_DecrementConnectionReferences( mqttConnection );\r
1383 }\r
1384 \r
1385 /*-----------------------------------------------------------*/\r
1386 \r
1387 IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,\r
1388                                   const IotMqttSubscription_t * pSubscriptionList,\r
1389                                   size_t subscriptionCount,\r
1390                                   uint32_t flags,\r
1391                                   const IotMqttCallbackInfo_t * pCallbackInfo,\r
1392                                   IotMqttOperation_t * pSubscribeOperation )\r
1393 {\r
1394     return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
1395                                 mqttConnection,\r
1396                                 pSubscriptionList,\r
1397                                 subscriptionCount,\r
1398                                 flags,\r
1399                                 pCallbackInfo,\r
1400                                 pSubscribeOperation );\r
1401 }\r
1402 \r
1403 /*-----------------------------------------------------------*/\r
1404 \r
1405 IotMqttError_t IotMqtt_TimedSubscribe( IotMqttConnection_t mqttConnection,\r
1406                                        const IotMqttSubscription_t * pSubscriptionList,\r
1407                                        size_t subscriptionCount,\r
1408                                        uint32_t flags,\r
1409                                        uint32_t timeoutMs )\r
1410 {\r
1411     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1412     IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1413 \r
1414     /* Flags are not used, but the parameter is present for future compatibility. */\r
1415     ( void ) flags;\r
1416 \r
1417     /* Call the asynchronous SUBSCRIBE function. */\r
1418     status = IotMqtt_Subscribe( mqttConnection,\r
1419                                 pSubscriptionList,\r
1420                                 subscriptionCount,\r
1421                                 IOT_MQTT_FLAG_WAITABLE,\r
1422                                 NULL,\r
1423                                 &subscribeOperation );\r
1424 \r
1425     /* Wait for the SUBSCRIBE operation to complete. */\r
1426     if( status == IOT_MQTT_STATUS_PENDING )\r
1427     {\r
1428         status = IotMqtt_Wait( subscribeOperation, timeoutMs );\r
1429     }\r
1430     else\r
1431     {\r
1432         EMPTY_ELSE_MARKER;\r
1433     }\r
1434 \r
1435     /* Ensure that a status was set. */\r
1436     IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
1437 \r
1438     return status;\r
1439 }\r
1440 \r
1441 /*-----------------------------------------------------------*/\r
1442 \r
1443 IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,\r
1444                                     const IotMqttSubscription_t * pSubscriptionList,\r
1445                                     size_t subscriptionCount,\r
1446                                     uint32_t flags,\r
1447                                     const IotMqttCallbackInfo_t * pCallbackInfo,\r
1448                                     IotMqttOperation_t * pUnsubscribeOperation )\r
1449 {\r
1450     return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
1451                                 mqttConnection,\r
1452                                 pSubscriptionList,\r
1453                                 subscriptionCount,\r
1454                                 flags,\r
1455                                 pCallbackInfo,\r
1456                                 pUnsubscribeOperation );\r
1457 }\r
1458 \r
1459 /*-----------------------------------------------------------*/\r
1460 \r
1461 IotMqttError_t IotMqtt_TimedUnsubscribe( IotMqttConnection_t mqttConnection,\r
1462                                          const IotMqttSubscription_t * pSubscriptionList,\r
1463                                          size_t subscriptionCount,\r
1464                                          uint32_t flags,\r
1465                                          uint32_t timeoutMs )\r
1466 {\r
1467     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1468     IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1469 \r
1470     /* Flags are not used, but the parameter is present for future compatibility. */\r
1471     ( void ) flags;\r
1472 \r
1473     /* Call the asynchronous UNSUBSCRIBE function. */\r
1474     status = IotMqtt_Unsubscribe( mqttConnection,\r
1475                                   pSubscriptionList,\r
1476                                   subscriptionCount,\r
1477                                   IOT_MQTT_FLAG_WAITABLE,\r
1478                                   NULL,\r
1479                                   &unsubscribeOperation );\r
1480 \r
1481     /* Wait for the UNSUBSCRIBE operation to complete. */\r
1482     if( status == IOT_MQTT_STATUS_PENDING )\r
1483     {\r
1484         status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );\r
1485     }\r
1486     else\r
1487     {\r
1488         EMPTY_ELSE_MARKER;\r
1489     }\r
1490 \r
1491     /* Ensure that a status was set. */\r
1492     IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
1493 \r
1494     return status;\r
1495 }\r
1496 \r
1497 /*-----------------------------------------------------------*/\r
1498 \r
1499 IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,\r
1500                                 const IotMqttPublishInfo_t * pPublishInfo,\r
1501                                 uint32_t flags,\r
1502                                 const IotMqttCallbackInfo_t * pCallbackInfo,\r
1503                                 IotMqttOperation_t * pPublishOperation )\r
1504 {\r
1505     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
1506     _mqttOperation_t * pOperation = NULL;\r
1507     uint8_t ** pPacketIdentifierHigh = NULL;\r
1508 \r
1509     /* Default PUBLISH serializer function. */\r
1510     IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,\r
1511                                            uint8_t **,\r
1512                                            size_t *,\r
1513                                            uint16_t *,\r
1514                                            uint8_t ** ) = _IotMqtt_SerializePublish;\r
1515 \r
1516     /* Check that the PUBLISH information is valid. */\r
1517     if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,\r
1518                                   pPublishInfo ) == false )\r
1519     {\r
1520         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1521     }\r
1522     else\r
1523     {\r
1524         EMPTY_ELSE_MARKER;\r
1525     }\r
1526 \r
1527     /* Check that no notification is requested for a QoS 0 publish. */\r
1528     if( pPublishInfo->qos == IOT_MQTT_QOS_0 )\r
1529     {\r
1530         if( pCallbackInfo != NULL )\r
1531         {\r
1532             IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
1533 \r
1534             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1535         }\r
1536         else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )\r
1537         {\r
1538             IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
1539 \r
1540             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1541         }\r
1542         else\r
1543         {\r
1544             EMPTY_ELSE_MARKER;\r
1545         }\r
1546 \r
1547         if( pPublishOperation != NULL )\r
1548         {\r
1549             IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );\r
1550         }\r
1551         else\r
1552         {\r
1553             EMPTY_ELSE_MARKER;\r
1554         }\r
1555     }\r
1556     else\r
1557     {\r
1558         EMPTY_ELSE_MARKER;\r
1559     }\r
1560 \r
1561     /* Check that a reference pointer is provided for a waitable operation. */\r
1562     if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
1563     {\r
1564         if( pPublishOperation == NULL )\r
1565         {\r
1566             IotLogError( "Reference must be provided for a waitable PUBLISH." );\r
1567 \r
1568             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1569         }\r
1570         else\r
1571         {\r
1572             EMPTY_ELSE_MARKER;\r
1573         }\r
1574     }\r
1575     else\r
1576     {\r
1577         EMPTY_ELSE_MARKER;\r
1578     }\r
1579 \r
1580     /* Create a PUBLISH operation. */\r
1581     status = _IotMqtt_CreateOperation( mqttConnection,\r
1582                                        flags,\r
1583                                        pCallbackInfo,\r
1584                                        &pOperation );\r
1585 \r
1586     if( status != IOT_MQTT_SUCCESS )\r
1587     {\r
1588         IOT_GOTO_CLEANUP();\r
1589     }\r
1590     else\r
1591     {\r
1592         EMPTY_ELSE_MARKER;\r
1593     }\r
1594 \r
1595     /* Check the PUBLISH operation data and set the operation type. */\r
1596     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1597     pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;\r
1598 \r
1599     /* Choose a PUBLISH serializer function. */\r
1600     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1601         if( mqttConnection->pSerializer != NULL )\r
1602         {\r
1603             if( mqttConnection->pSerializer->serialize.publish != NULL )\r
1604             {\r
1605                 serializePublish = mqttConnection->pSerializer->serialize.publish;\r
1606             }\r
1607             else\r
1608             {\r
1609                 EMPTY_ELSE_MARKER;\r
1610             }\r
1611         }\r
1612         else\r
1613         {\r
1614             EMPTY_ELSE_MARKER;\r
1615         }\r
1616     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1617 \r
1618     /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */\r
1619     if( mqttConnection->awsIotMqttMode == true )\r
1620     {\r
1621         pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );\r
1622     }\r
1623     else\r
1624     {\r
1625         EMPTY_ELSE_MARKER;\r
1626     }\r
1627 \r
1628     /* Generate a PUBLISH packet from pPublishInfo. */\r
1629     status = serializePublish( pPublishInfo,\r
1630                                &( pOperation->u.operation.pMqttPacket ),\r
1631                                &( pOperation->u.operation.packetSize ),\r
1632                                &( pOperation->u.operation.packetIdentifier ),\r
1633                                pPacketIdentifierHigh );\r
1634 \r
1635     if( status != IOT_MQTT_SUCCESS )\r
1636     {\r
1637         IOT_GOTO_CLEANUP();\r
1638     }\r
1639     else\r
1640     {\r
1641         EMPTY_ELSE_MARKER;\r
1642     }\r
1643 \r
1644     /* Check the serialized MQTT packet. */\r
1645     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1646     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1647 \r
1648     /* Initialize PUBLISH retry if retryLimit is set. */\r
1649     if( pPublishInfo->retryLimit > 0 )\r
1650     {\r
1651         /* A QoS 0 PUBLISH may not be retried. */\r
1652         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1653         {\r
1654             pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
1655             pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
1656         }\r
1657         else\r
1658         {\r
1659             EMPTY_ELSE_MARKER;\r
1660         }\r
1661     }\r
1662     else\r
1663     {\r
1664         EMPTY_ELSE_MARKER;\r
1665     }\r
1666 \r
1667     /* Set the reference, if provided. */\r
1668     if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1669     {\r
1670         if( pPublishOperation != NULL )\r
1671         {\r
1672             *pPublishOperation = pOperation;\r
1673         }\r
1674         else\r
1675         {\r
1676             EMPTY_ELSE_MARKER;\r
1677         }\r
1678     }\r
1679     else\r
1680     {\r
1681         EMPTY_ELSE_MARKER;\r
1682     }\r
1683 \r
1684     /* Add the PUBLISH operation to the send queue for network transmission. */\r
1685     status = _IotMqtt_ScheduleOperation( pOperation,\r
1686                                          _IotMqtt_ProcessSend,\r
1687                                          0 );\r
1688 \r
1689     if( status != IOT_MQTT_SUCCESS )\r
1690     {\r
1691         IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",\r
1692                      mqttConnection );\r
1693 \r
1694         /* Clear the previously set (and now invalid) reference. */\r
1695         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1696         {\r
1697             if( pPublishOperation != NULL )\r
1698             {\r
1699                 *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1700             }\r
1701             else\r
1702             {\r
1703                 EMPTY_ELSE_MARKER;\r
1704             }\r
1705         }\r
1706         else\r
1707         {\r
1708             EMPTY_ELSE_MARKER;\r
1709         }\r
1710 \r
1711         IOT_GOTO_CLEANUP();\r
1712     }\r
1713     else\r
1714     {\r
1715         EMPTY_ELSE_MARKER;\r
1716     }\r
1717 \r
1718     /* Clean up the PUBLISH operation if this function fails. Otherwise, set the\r
1719      * appropriate return code based on QoS. */\r
1720     IOT_FUNCTION_CLEANUP_BEGIN();\r
1721 \r
1722     if( status != IOT_MQTT_SUCCESS )\r
1723     {\r
1724         if( pOperation != NULL )\r
1725         {\r
1726             _IotMqtt_DestroyOperation( pOperation );\r
1727         }\r
1728         else\r
1729         {\r
1730             EMPTY_ELSE_MARKER;\r
1731         }\r
1732     }\r
1733     else\r
1734     {\r
1735         if( pPublishInfo->qos > IOT_MQTT_QOS_0 )\r
1736         {\r
1737             status = IOT_MQTT_STATUS_PENDING;\r
1738         }\r
1739         else\r
1740         {\r
1741             EMPTY_ELSE_MARKER;\r
1742         }\r
1743 \r
1744         IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",\r
1745                     mqttConnection );\r
1746     }\r
1747 \r
1748     IOT_FUNCTION_CLEANUP_END();\r
1749 }\r
1750 \r
1751 /*-----------------------------------------------------------*/\r
1752 \r
1753 IotMqttError_t IotMqtt_TimedPublish( IotMqttConnection_t mqttConnection,\r
1754                                      const IotMqttPublishInfo_t * pPublishInfo,\r
1755                                      uint32_t flags,\r
1756                                      uint32_t timeoutMs )\r
1757 {\r
1758     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1759     IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,\r
1760                        * pPublishOperation = NULL;\r
1761 \r
1762     /* Clear the flags. */\r
1763     flags = 0;\r
1764 \r
1765     /* Set the waitable flag and reference for QoS 1 PUBLISH. */\r
1766     if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
1767     {\r
1768         flags = IOT_MQTT_FLAG_WAITABLE;\r
1769         pPublishOperation = &publishOperation;\r
1770     }\r
1771     else\r
1772     {\r
1773         EMPTY_ELSE_MARKER;\r
1774     }\r
1775 \r
1776     /* Call the asynchronous PUBLISH function. */\r
1777     status = IotMqtt_Publish( mqttConnection,\r
1778                               pPublishInfo,\r
1779                               flags,\r
1780                               NULL,\r
1781                               pPublishOperation );\r
1782 \r
1783     /* Wait for a queued QoS 1 PUBLISH to complete. */\r
1784     if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
1785     {\r
1786         if( status == IOT_MQTT_STATUS_PENDING )\r
1787         {\r
1788             status = IotMqtt_Wait( publishOperation, timeoutMs );\r
1789         }\r
1790         else\r
1791         {\r
1792             EMPTY_ELSE_MARKER;\r
1793         }\r
1794     }\r
1795     else\r
1796     {\r
1797         EMPTY_ELSE_MARKER;\r
1798     }\r
1799 \r
1800     return status;\r
1801 }\r
1802 \r
1803 /*-----------------------------------------------------------*/\r
1804 \r
1805 IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,\r
1806                              uint32_t timeoutMs )\r
1807 {\r
1808     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
1809     _mqttConnection_t * pMqttConnection = operation->pMqttConnection;\r
1810 \r
1811     /* Validate the given operation reference. */\r
1812     if( _IotMqtt_ValidateOperation( operation ) == false )\r
1813     {\r
1814         status = IOT_MQTT_BAD_PARAMETER;\r
1815     }\r
1816     else\r
1817     {\r
1818         EMPTY_ELSE_MARKER;\r
1819     }\r
1820 \r
1821     /* Check the MQTT connection status. */\r
1822     if( status == IOT_MQTT_SUCCESS )\r
1823     {\r
1824         IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1825 \r
1826         if( pMqttConnection->disconnected == true )\r
1827         {\r
1828             IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "\r
1829                          "Operation cannot be waited on.",\r
1830                          pMqttConnection,\r
1831                          IotMqtt_OperationType( operation->u.operation.type ),\r
1832                          operation );\r
1833 \r
1834             status = IOT_MQTT_NETWORK_ERROR;\r
1835         }\r
1836         else\r
1837         {\r
1838             IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",\r
1839                         pMqttConnection,\r
1840                         IotMqtt_OperationType( operation->u.operation.type ),\r
1841                         operation );\r
1842         }\r
1843 \r
1844         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1845 \r
1846         /* Only wait on an operation if the MQTT connection is active. */\r
1847         if( status == IOT_MQTT_SUCCESS )\r
1848         {\r
1849             if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),\r
1850                                         timeoutMs ) == false )\r
1851             {\r
1852                 status = IOT_MQTT_TIMEOUT;\r
1853 \r
1854                 /* Attempt to cancel the job of the timed out operation. */\r
1855                 ( void ) _IotMqtt_DecrementOperationReferences( operation, true );\r
1856 \r
1857                 /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */\r
1858                 if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
1859                 {\r
1860                     IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"\r
1861                                  " subscriptions of timed-out SUBSCRIBE.",\r
1862                                  pMqttConnection,\r
1863                                  operation );\r
1864 \r
1865                     _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,\r
1866                                                          operation->u.operation.packetIdentifier,\r
1867                                                          -1 );\r
1868                 }\r
1869                 else\r
1870                 {\r
1871                     EMPTY_ELSE_MARKER;\r
1872                 }\r
1873             }\r
1874             else\r
1875             {\r
1876                 /* Retrieve the status of the completed operation. */\r
1877                 status = operation->u.operation.status;\r
1878             }\r
1879 \r
1880             IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",\r
1881                         pMqttConnection,\r
1882                         IotMqtt_OperationType( operation->u.operation.type ),\r
1883                         operation,\r
1884                         IotMqtt_strerror( status ) );\r
1885         }\r
1886         else\r
1887         {\r
1888             EMPTY_ELSE_MARKER;\r
1889         }\r
1890 \r
1891         /* Wait is finished; decrement operation reference count. */\r
1892         if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )\r
1893         {\r
1894             _IotMqtt_DestroyOperation( operation );\r
1895         }\r
1896         else\r
1897         {\r
1898             EMPTY_ELSE_MARKER;\r
1899         }\r
1900     }\r
1901     else\r
1902     {\r
1903         EMPTY_ELSE_MARKER;\r
1904     }\r
1905 \r
1906     return status;\r
1907 }\r
1908 \r
1909 /*-----------------------------------------------------------*/\r
1910 \r
1911 const char * IotMqtt_strerror( IotMqttError_t status )\r
1912 {\r
1913     const char * pMessage = NULL;\r
1914 \r
1915     switch( status )\r
1916     {\r
1917         case IOT_MQTT_SUCCESS:\r
1918             pMessage = "SUCCESS";\r
1919             break;\r
1920 \r
1921         case IOT_MQTT_STATUS_PENDING:\r
1922             pMessage = "PENDING";\r
1923             break;\r
1924 \r
1925         case IOT_MQTT_INIT_FAILED:\r
1926             pMessage = "INITIALIZATION FAILED";\r
1927             break;\r
1928 \r
1929         case IOT_MQTT_BAD_PARAMETER:\r
1930             pMessage = "BAD PARAMETER";\r
1931             break;\r
1932 \r
1933         case IOT_MQTT_NO_MEMORY:\r
1934             pMessage = "NO MEMORY";\r
1935             break;\r
1936 \r
1937         case IOT_MQTT_NETWORK_ERROR:\r
1938             pMessage = "NETWORK ERROR";\r
1939             break;\r
1940 \r
1941         case IOT_MQTT_SCHEDULING_ERROR:\r
1942             pMessage = "SCHEDULING ERROR";\r
1943             break;\r
1944 \r
1945         case IOT_MQTT_BAD_RESPONSE:\r
1946             pMessage = "BAD RESPONSE RECEIVED";\r
1947             break;\r
1948 \r
1949         case IOT_MQTT_TIMEOUT:\r
1950             pMessage = "TIMEOUT";\r
1951             break;\r
1952 \r
1953         case IOT_MQTT_SERVER_REFUSED:\r
1954             pMessage = "SERVER REFUSED";\r
1955             break;\r
1956 \r
1957         case IOT_MQTT_RETRY_NO_RESPONSE:\r
1958             pMessage = "NO RESPONSE";\r
1959             break;\r
1960 \r
1961         default:\r
1962             pMessage = "INVALID STATUS";\r
1963             break;\r
1964     }\r
1965 \r
1966     return pMessage;\r
1967 }\r
1968 \r
1969 /*-----------------------------------------------------------*/\r
1970 \r
1971 const char * IotMqtt_OperationType( IotMqttOperationType_t operation )\r
1972 {\r
1973     const char * pMessage = NULL;\r
1974 \r
1975     switch( operation )\r
1976     {\r
1977         case IOT_MQTT_CONNECT:\r
1978             pMessage = "CONNECT";\r
1979             break;\r
1980 \r
1981         case IOT_MQTT_PUBLISH_TO_SERVER:\r
1982             pMessage = "PUBLISH";\r
1983             break;\r
1984 \r
1985         case IOT_MQTT_PUBACK:\r
1986             pMessage = "PUBACK";\r
1987             break;\r
1988 \r
1989         case IOT_MQTT_SUBSCRIBE:\r
1990             pMessage = "SUBSCRIBE";\r
1991             break;\r
1992 \r
1993         case IOT_MQTT_UNSUBSCRIBE:\r
1994             pMessage = "UNSUBSCRIBE";\r
1995             break;\r
1996 \r
1997         case IOT_MQTT_PINGREQ:\r
1998             pMessage = "PINGREQ";\r
1999             break;\r
2000 \r
2001         case IOT_MQTT_DISCONNECT:\r
2002             pMessage = "DISCONNECT";\r
2003             break;\r
2004 \r
2005         default:\r
2006             pMessage = "INVALID OPERATION";\r
2007             break;\r
2008     }\r
2009 \r
2010     return pMessage;\r
2011 }\r
2012 \r
2013 /*-----------------------------------------------------------*/\r
2014 \r
2015 /* Provide access to internal functions and variables if testing. */\r
2016 #if IOT_BUILD_TESTS == 1\r
2017     #include "iot_test_access_mqtt_api.c"\r
2018 #endif\r