]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
Correct an err in queue.c introduced when previously updating behaviour when queue...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_api.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_api.c\r
28  * @brief Implements most user-facing functions of the MQTT library.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <string.h>\r
36 \r
37 /* Error handling include. */\r
38 #include "private/iot_error.h"\r
39 \r
40 /* MQTT internal include. */\r
41 #include "private/iot_mqtt_internal.h"\r
42 \r
43 /* Platform layer includes. */\r
44 #include "platform/iot_clock.h"\r
45 #include "platform/iot_threads.h"\r
46 \r
47 /* Validate MQTT configuration settings. */\r
48 #if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1\r
49     #error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1."\r
50 #endif\r
51 #if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1\r
52     #error "IOT_MQTT_ENABLE_METRICS must be 0 or 1."\r
53 #endif\r
54 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1\r
55     #error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1."\r
56 #endif\r
57 #if IOT_MQTT_RESPONSE_WAIT_MS <= 0\r
58     #error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative."\r
59 #endif\r
60 #if IOT_MQTT_RETRY_MS_CEILING <= 0\r
61     #error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative."\r
62 #endif\r
63 \r
64 /*-----------------------------------------------------------*/\r
65 \r
66 /**\r
67  * @brief Set the unsubscribed flag of an MQTT subscription.\r
68  *\r
69  * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.\r
70  * @param[in] pMatch Not used.\r
71  *\r
72  * @return Always returns `true`.\r
73  */\r
74 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
75                                               void * pMatch );\r
76 \r
77 /**\r
78  * @brief Destroy an MQTT subscription if its reference count is 0.\r
79  *\r
80  * @param[in] pData The subscription to destroy. This parameter is of type\r
81  * `void*` for compatibility with [free]\r
82  * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
83  */\r
84 static void _mqttSubscription_tryDestroy( void * pData );\r
85 \r
86 /**\r
87  * @brief Decrement the reference count of an MQTT operation and attempt to\r
88  * destroy it.\r
89  *\r
90  * @param[in] pData The operation data to destroy. This parameter is of type\r
91  * `void*` for compatibility with [free]\r
92  * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).\r
93  */\r
94 static void _mqttOperation_tryDestroy( void * pData );\r
95 \r
96 /**\r
97  * @brief Initialize the keep-alive operation for an MQTT connection.\r
98  *\r
99  * @param[in] pNetworkInfo User-provided network information for the new\r
100  * connection.\r
101  * @param[in] keepAliveSeconds User-provided keep-alive interval.\r
102  * @param[out] pMqttConnection The MQTT connection associated with the keep-alive.\r
103  *\r
104  * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
105  */\r
106 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
107                                        uint16_t keepAliveSeconds,\r
108                                        _mqttConnection_t * pMqttConnection );\r
109 \r
110 /**\r
111  * @brief Creates a new MQTT connection and initializes its members.\r
112  *\r
113  * @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server.\r
114  * @param[in] pNetworkInfo User-provided network information for the new\r
115  * connection.\r
116  * @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection.\r
117  *\r
118  * @return Pointer to a newly-created MQTT connection; `NULL` on failure.\r
119  */\r
120 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
121                                                   const IotMqttNetworkInfo_t * pNetworkInfo,\r
122                                                   uint16_t keepAliveSeconds );\r
123 \r
124 /**\r
125  * @brief Destroys the members of an MQTT connection.\r
126  *\r
127  * @param[in] pMqttConnection Which connection to destroy.\r
128  */\r
129 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection );\r
130 \r
131 /**\r
132  * @brief The common component of both @ref mqtt_function_subscribe and @ref\r
133  * mqtt_function_unsubscribe.\r
134  *\r
135  * See @ref mqtt_function_subscribe or @ref mqtt_function_unsubscribe for a\r
136  * description of the parameters and return values.\r
137  */\r
138 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
139                                            IotMqttConnection_t mqttConnection,\r
140                                            const IotMqttSubscription_t * pSubscriptionList,\r
141                                            size_t subscriptionCount,\r
142                                            uint32_t flags,\r
143                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
144                                            IotMqttOperation_t * const pOperationReference );\r
145 \r
146 /*-----------------------------------------------------------*/\r
147 \r
148 static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink,\r
149                                               void * pMatch )\r
150 {\r
151     /* Because this function is called from a container function, the given link\r
152      * must never be NULL. */\r
153     IotMqtt_Assert( pSubscriptionLink != NULL );\r
154 \r
155     _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,\r
156                                                              pSubscriptionLink,\r
157                                                              link );\r
158 \r
159     /* Silence warnings about unused parameters. */\r
160     ( void ) pMatch;\r
161 \r
162     /* Set the unsubscribed flag. */\r
163     pSubscription->unsubscribed = true;\r
164 \r
165     return true;\r
166 }\r
167 \r
168 /*-----------------------------------------------------------*/\r
169 \r
170 static void _mqttSubscription_tryDestroy( void * pData )\r
171 {\r
172     _mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData;\r
173 \r
174     /* Reference count must not be negative. */\r
175     IotMqtt_Assert( pSubscription->references >= 0 );\r
176 \r
177     /* Unsubscribed flag should be set. */\r
178     IotMqtt_Assert( pSubscription->unsubscribed == true );\r
179 \r
180     /* Free the subscription if it has no references. */\r
181     if( pSubscription->references == 0 )\r
182     {\r
183         IotMqtt_FreeSubscription( pSubscription );\r
184     }\r
185     else\r
186     {\r
187         EMPTY_ELSE_MARKER;\r
188     }\r
189 }\r
190 \r
191 /*-----------------------------------------------------------*/\r
192 \r
193 static void _mqttOperation_tryDestroy( void * pData )\r
194 {\r
195     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData;\r
196     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
197 \r
198     /* Incoming PUBLISH operations may always be freed. */\r
199     if( pOperation->incomingPublish == true )\r
200     {\r
201         /* Cancel the incoming PUBLISH operation's job. */\r
202         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
203                                                 pOperation->job,\r
204                                                 NULL );\r
205 \r
206         /* If the operation's job was not canceled, it must be already executing.\r
207          * Any other return value is invalid. */\r
208         IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
209                         ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
210 \r
211         /* Check if the incoming PUBLISH job was canceled. */\r
212         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
213         {\r
214             /* Job was canceled. Process incoming PUBLISH now to clean up. */\r
215             _IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL,\r
216                                              pOperation->job,\r
217                                              pOperation );\r
218         }\r
219         else\r
220         {\r
221             /* The executing job will process the PUBLISH, so nothing is done here. */\r
222             EMPTY_ELSE_MARKER;\r
223         }\r
224     }\r
225     else\r
226     {\r
227         /* Decrement reference count and destroy operation if possible. */\r
228         if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true )\r
229         {\r
230             _IotMqtt_DestroyOperation( pOperation );\r
231         }\r
232         else\r
233         {\r
234             EMPTY_ELSE_MARKER;\r
235         }\r
236     }\r
237 }\r
238 \r
239 /*-----------------------------------------------------------*/\r
240 \r
241 static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
242                                        uint16_t keepAliveSeconds,\r
243                                        _mqttConnection_t * pMqttConnection )\r
244 {\r
245     bool status = true;\r
246     IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
247     IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS;\r
248 \r
249     /* Network information is not used when MQTT packet serializers are disabled. */\r
250     ( void ) pNetworkInfo;\r
251 \r
252     /* Default PINGREQ serializer function. */\r
253     IotMqttError_t ( * serializePingreq )( uint8_t **,\r
254                                            size_t * ) = _IotMqtt_SerializePingreq;\r
255 \r
256     /* Set PINGREQ operation members. */\r
257     pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;\r
258 \r
259     /* Convert the keep-alive interval to milliseconds. */\r
260     pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;\r
261     pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;\r
262 \r
263     /* Choose a PINGREQ serializer function. */\r
264     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
265         if( pNetworkInfo->pMqttSerializer != NULL )\r
266         {\r
267             if( pNetworkInfo->pMqttSerializer->serialize.pingreq != NULL )\r
268             {\r
269                 serializePingreq = pNetworkInfo->pMqttSerializer->serialize.pingreq;\r
270             }\r
271             else\r
272             {\r
273                 EMPTY_ELSE_MARKER;\r
274             }\r
275         }\r
276         else\r
277         {\r
278             EMPTY_ELSE_MARKER;\r
279         }\r
280     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
281 \r
282     /* Generate a PINGREQ packet. */\r
283     serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),\r
284                                         &( pMqttConnection->pingreq.u.operation.packetSize ) );\r
285 \r
286     if( serializeStatus != IOT_MQTT_SUCCESS )\r
287     {\r
288         IotLogError( "Failed to allocate PINGREQ packet for new connection." );\r
289 \r
290         status = false;\r
291     }\r
292     else\r
293     {\r
294         /* Create the task pool job that processes keep-alive. */\r
295         jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
296                                            pMqttConnection,\r
297                                            &( pMqttConnection->pingreq.jobStorage ),\r
298                                            &( pMqttConnection->pingreq.job ) );\r
299 \r
300         /* Task pool job creation for a pre-allocated job should never fail.\r
301          * Abort the program if it does. */\r
302         if( jobStatus != IOT_TASKPOOL_SUCCESS )\r
303         {\r
304             IotLogError( "Failed to create keep-alive job for new connection." );\r
305 \r
306             IotMqtt_Assert( false );\r
307         }\r
308         else\r
309         {\r
310             EMPTY_ELSE_MARKER;\r
311         }\r
312 \r
313         /* Keep-alive references its MQTT connection, so increment reference. */\r
314         ( pMqttConnection->references )++;\r
315     }\r
316 \r
317     return status;\r
318 }\r
319 \r
320 /*-----------------------------------------------------------*/\r
321 \r
322 static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,\r
323                                                   const IotMqttNetworkInfo_t * pNetworkInfo,\r
324                                                   uint16_t keepAliveSeconds )\r
325 {\r
326     IOT_FUNCTION_ENTRY( bool, true );\r
327     _mqttConnection_t * pMqttConnection = NULL;\r
328     bool referencesMutexCreated = false, subscriptionMutexCreated = false;\r
329 \r
330     /* Allocate memory for the new MQTT connection. */\r
331     pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) );\r
332 \r
333     if( pMqttConnection == NULL )\r
334     {\r
335         IotLogError( "Failed to allocate memory for new connection." );\r
336 \r
337         IOT_SET_AND_GOTO_CLEANUP( false );\r
338     }\r
339     else\r
340     {\r
341         /* Clear the MQTT connection, then copy the MQTT server mode, network\r
342          * interface, and disconnect callback. */\r
343         ( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) );\r
344         pMqttConnection->awsIotMqttMode = awsIotMqttMode;\r
345         pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface;\r
346         pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback;\r
347 \r
348         /* Start a new MQTT connection with a reference count of 1. */\r
349         pMqttConnection->references = 1;\r
350     }\r
351 \r
352     /* Create the references mutex for a new connection. It is a recursive mutex. */\r
353     referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true );\r
354 \r
355     if( referencesMutexCreated == false )\r
356     {\r
357         IotLogError( "Failed to create references mutex for new connection." );\r
358 \r
359         IOT_SET_AND_GOTO_CLEANUP( false );\r
360     }\r
361     else\r
362     {\r
363         EMPTY_ELSE_MARKER;\r
364     }\r
365 \r
366     /* Create the subscription mutex for a new connection. */\r
367     subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false );\r
368 \r
369     if( subscriptionMutexCreated == false )\r
370     {\r
371         IotLogError( "Failed to create subscription mutex for new connection." );\r
372 \r
373         IOT_SET_AND_GOTO_CLEANUP( false );\r
374     }\r
375     else\r
376     {\r
377         EMPTY_ELSE_MARKER;\r
378     }\r
379 \r
380     /* Create the new connection's subscription and operation lists. */\r
381     IotListDouble_Create( &( pMqttConnection->subscriptionList ) );\r
382     IotListDouble_Create( &( pMqttConnection->pendingProcessing ) );\r
383     IotListDouble_Create( &( pMqttConnection->pendingResponse ) );\r
384 \r
385     /* AWS IoT service limits set minimum and maximum values for keep-alive interval.\r
386      * Adjust the user-provided keep-alive interval based on these requirements. */\r
387     if( awsIotMqttMode == true )\r
388     {\r
389         if( keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )\r
390         {\r
391             keepAliveSeconds = AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE;\r
392         }\r
393         else if( keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )\r
394         {\r
395             keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
396         }\r
397         else if( keepAliveSeconds == 0 )\r
398         {\r
399             keepAliveSeconds = AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE;\r
400         }\r
401         else\r
402         {\r
403             EMPTY_ELSE_MARKER;\r
404         }\r
405     }\r
406     else\r
407     {\r
408         EMPTY_ELSE_MARKER;\r
409     }\r
410 \r
411     /* Check if keep-alive is active for this connection. */\r
412     if( keepAliveSeconds != 0 )\r
413     {\r
414         if( _createKeepAliveOperation( pNetworkInfo,\r
415                                        keepAliveSeconds,\r
416                                        pMqttConnection ) == false )\r
417         {\r
418             IOT_SET_AND_GOTO_CLEANUP( false );\r
419         }\r
420         else\r
421         {\r
422             EMPTY_ELSE_MARKER;\r
423         }\r
424     }\r
425     else\r
426     {\r
427         EMPTY_ELSE_MARKER;\r
428     }\r
429 \r
430     /* Clean up mutexes and connection if this function failed. */\r
431     IOT_FUNCTION_CLEANUP_BEGIN();\r
432 \r
433     if( status == false )\r
434     {\r
435         if( subscriptionMutexCreated == true )\r
436         {\r
437             IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
438         }\r
439         else\r
440         {\r
441             EMPTY_ELSE_MARKER;\r
442         }\r
443 \r
444         if( referencesMutexCreated == true )\r
445         {\r
446             IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
447         }\r
448         else\r
449         {\r
450             EMPTY_ELSE_MARKER;\r
451         }\r
452 \r
453         if( pMqttConnection != NULL )\r
454         {\r
455             IotMqtt_FreeConnection( pMqttConnection );\r
456             pMqttConnection = NULL;\r
457         }\r
458         else\r
459         {\r
460             EMPTY_ELSE_MARKER;\r
461         }\r
462     }\r
463     else\r
464     {\r
465         EMPTY_ELSE_MARKER;\r
466     }\r
467 \r
468     return pMqttConnection;\r
469 }\r
470 \r
471 /*-----------------------------------------------------------*/\r
472 \r
473 static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )\r
474 {\r
475     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
476 \r
477     /* Default free packet function. */\r
478     void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;\r
479 \r
480     /* Clean up keep-alive if still allocated. */\r
481     if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
482     {\r
483         IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
484 \r
485         /* Choose a function to free the packet. */\r
486         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
487             if( pMqttConnection->pSerializer != NULL )\r
488             {\r
489                 if( pMqttConnection->pSerializer->freePacket != NULL )\r
490                 {\r
491                     freePacket = pMqttConnection->pSerializer->freePacket;\r
492                 }\r
493             }\r
494         #endif\r
495 \r
496         freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
497 \r
498         /* Clear data about the keep-alive. */\r
499         pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
500         pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
501         pMqttConnection->pingreq.u.operation.packetSize = 0;\r
502 \r
503         /* Decrement reference count. */\r
504         pMqttConnection->references--;\r
505     }\r
506     else\r
507     {\r
508         EMPTY_ELSE_MARKER;\r
509     }\r
510 \r
511     /* A connection to be destroyed should have no keep-alive and at most 1\r
512      * reference. */\r
513     IotMqtt_Assert( pMqttConnection->references <= 1 );\r
514     IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );\r
515     IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );\r
516     IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );\r
517 \r
518     /* Remove all subscriptions. */\r
519     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
520     IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),\r
521                                     _mqttSubscription_setUnsubscribe,\r
522                                     NULL,\r
523                                     _mqttSubscription_tryDestroy,\r
524                                     offsetof( _mqttSubscription_t, link ) );\r
525     IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );\r
526 \r
527     /* Destroy an owned network connection. */\r
528     if( pMqttConnection->ownNetworkConnection == true )\r
529     {\r
530         networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection );\r
531 \r
532         if( networkStatus != IOT_NETWORK_SUCCESS )\r
533         {\r
534             IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.",\r
535                         pMqttConnection );\r
536         }\r
537         else\r
538         {\r
539             IotLogInfo( "(MQTT connection %p) Network connection destroyed.",\r
540                         pMqttConnection );\r
541         }\r
542     }\r
543     else\r
544     {\r
545         EMPTY_ELSE_MARKER;\r
546     }\r
547 \r
548     /* Destroy mutexes. */\r
549     IotMutex_Destroy( &( pMqttConnection->referencesMutex ) );\r
550     IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) );\r
551 \r
552     IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection );\r
553 \r
554     /* Free connection. */\r
555     IotMqtt_FreeConnection( pMqttConnection );\r
556 }\r
557 \r
558 /*-----------------------------------------------------------*/\r
559 \r
560 static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,\r
561                                            IotMqttConnection_t mqttConnection,\r
562                                            const IotMqttSubscription_t * pSubscriptionList,\r
563                                            size_t subscriptionCount,\r
564                                            uint32_t flags,\r
565                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
566                                            IotMqttOperation_t * const pOperationReference )\r
567 {\r
568     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
569     _mqttOperation_t * pSubscriptionOperation = NULL;\r
570 \r
571     /* Subscription serializer function. */\r
572     IotMqttError_t ( * serializeSubscription )( const IotMqttSubscription_t *,\r
573                                                 size_t,\r
574                                                 uint8_t **,\r
575                                                 size_t *,\r
576                                                 uint16_t * ) = NULL;\r
577 \r
578     /* This function should only be called for subscribe or unsubscribe. */\r
579     IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||\r
580                     ( operation == IOT_MQTT_UNSUBSCRIBE ) );\r
581 \r
582     /* Check that all elements in the subscription list are valid. */\r
583     if( _IotMqtt_ValidateSubscriptionList( operation,\r
584                                            mqttConnection->awsIotMqttMode,\r
585                                            pSubscriptionList,\r
586                                            subscriptionCount ) == false )\r
587     {\r
588         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
589     }\r
590     else\r
591     {\r
592         EMPTY_ELSE_MARKER;\r
593     }\r
594 \r
595     /* Check that a reference pointer is provided for a waitable operation. */\r
596     if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
597     {\r
598         if( pOperationReference == NULL )\r
599         {\r
600             IotLogError( "Reference must be provided for a waitable %s.",\r
601                          IotMqtt_OperationType( operation ) );\r
602 \r
603             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
604         }\r
605         else\r
606         {\r
607             EMPTY_ELSE_MARKER;\r
608         }\r
609     }\r
610     else\r
611     {\r
612         EMPTY_ELSE_MARKER;\r
613     }\r
614 \r
615     /* Choose a subscription serialize function. */\r
616     if( operation == IOT_MQTT_SUBSCRIBE )\r
617     {\r
618         serializeSubscription = _IotMqtt_SerializeSubscribe;\r
619 \r
620         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
621             if( mqttConnection->pSerializer != NULL )\r
622             {\r
623                 if( mqttConnection->pSerializer->serialize.subscribe != NULL )\r
624                 {\r
625                     serializeSubscription = mqttConnection->pSerializer->serialize.subscribe;\r
626                 }\r
627                 else\r
628                 {\r
629                     EMPTY_ELSE_MARKER;\r
630                 }\r
631             }\r
632             else\r
633             {\r
634                 EMPTY_ELSE_MARKER;\r
635             }\r
636         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
637     }\r
638     else\r
639     {\r
640         serializeSubscription = _IotMqtt_SerializeUnsubscribe;\r
641 \r
642         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
643             if( mqttConnection->pSerializer != NULL )\r
644             {\r
645                 if( mqttConnection->pSerializer->serialize.unsubscribe != NULL )\r
646                 {\r
647                     serializeSubscription = mqttConnection->pSerializer->serialize.unsubscribe;\r
648                 }\r
649                 else\r
650                 {\r
651                     EMPTY_ELSE_MARKER;\r
652                 }\r
653             }\r
654             else\r
655             {\r
656                 EMPTY_ELSE_MARKER;\r
657             }\r
658         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
659     }\r
660 \r
661     /* Remove the MQTT subscription list for an UNSUBSCRIBE. */\r
662     if( operation == IOT_MQTT_UNSUBSCRIBE )\r
663     {\r
664         _IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection,\r
665                                                   pSubscriptionList,\r
666                                                   subscriptionCount );\r
667     }\r
668     else\r
669     {\r
670         EMPTY_ELSE_MARKER;\r
671     }\r
672 \r
673     /* Create a subscription operation. */\r
674     status = _IotMqtt_CreateOperation( mqttConnection,\r
675                                        flags,\r
676                                        pCallbackInfo,\r
677                                        &pSubscriptionOperation );\r
678 \r
679     if( status != IOT_MQTT_SUCCESS )\r
680     {\r
681         IOT_GOTO_CLEANUP();\r
682     }\r
683 \r
684     /* Check the subscription operation data and set the operation type. */\r
685     IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
686     IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );\r
687     pSubscriptionOperation->u.operation.type = operation;\r
688 \r
689     /* Generate a subscription packet from the subscription list. */\r
690     status = serializeSubscription( pSubscriptionList,\r
691                                     subscriptionCount,\r
692                                     &( pSubscriptionOperation->u.operation.pMqttPacket ),\r
693                                     &( pSubscriptionOperation->u.operation.packetSize ),\r
694                                     &( pSubscriptionOperation->u.operation.packetIdentifier ) );\r
695 \r
696     if( status != IOT_MQTT_SUCCESS )\r
697     {\r
698         IOT_GOTO_CLEANUP();\r
699     }\r
700 \r
701     /* Check the serialized MQTT packet. */\r
702     IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL );\r
703     IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 );\r
704 \r
705     /* Add the subscription list for a SUBSCRIBE. */\r
706     if( operation == IOT_MQTT_SUBSCRIBE )\r
707     {\r
708         status = _IotMqtt_AddSubscriptions( mqttConnection,\r
709                                             pSubscriptionOperation->u.operation.packetIdentifier,\r
710                                             pSubscriptionList,\r
711                                             subscriptionCount );\r
712 \r
713         if( status != IOT_MQTT_SUCCESS )\r
714         {\r
715             IOT_GOTO_CLEANUP();\r
716         }\r
717     }\r
718 \r
719     /* Set the reference, if provided. */\r
720     if( pOperationReference != NULL )\r
721     {\r
722         *pOperationReference = pSubscriptionOperation;\r
723     }\r
724 \r
725     /* Schedule the subscription operation for network transmission. */\r
726     status = _IotMqtt_ScheduleOperation( pSubscriptionOperation,\r
727                                          _IotMqtt_ProcessSend,\r
728                                          0 );\r
729 \r
730     if( status != IOT_MQTT_SUCCESS )\r
731     {\r
732         IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.",\r
733                      mqttConnection,\r
734                      IotMqtt_OperationType( operation ) );\r
735 \r
736         if( operation == IOT_MQTT_SUBSCRIBE )\r
737         {\r
738             _IotMqtt_RemoveSubscriptionByPacket( mqttConnection,\r
739                                                  pSubscriptionOperation->u.operation.packetIdentifier,\r
740                                                  -1 );\r
741         }\r
742 \r
743         /* Clear the previously set (and now invalid) reference. */\r
744         if( pOperationReference != NULL )\r
745         {\r
746             *pOperationReference = IOT_MQTT_OPERATION_INITIALIZER;\r
747         }\r
748 \r
749         IOT_GOTO_CLEANUP();\r
750     }\r
751 \r
752     /* Clean up if this function failed. */\r
753     IOT_FUNCTION_CLEANUP_BEGIN();\r
754 \r
755     if( status != IOT_MQTT_SUCCESS )\r
756     {\r
757         if( pSubscriptionOperation != NULL )\r
758         {\r
759             _IotMqtt_DestroyOperation( pSubscriptionOperation );\r
760         }\r
761     }\r
762     else\r
763     {\r
764         status = IOT_MQTT_STATUS_PENDING;\r
765 \r
766         IotLogInfo( "(MQTT connection %p) %s operation scheduled.",\r
767                     mqttConnection,\r
768                     IotMqtt_OperationType( operation ) );\r
769     }\r
770 \r
771     IOT_FUNCTION_CLEANUP_END();\r
772 }\r
773 \r
774 /*-----------------------------------------------------------*/\r
775 \r
776 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
777 {\r
778     bool disconnected = false;\r
779 \r
780     /* Lock the mutex protecting the reference count. */\r
781     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
782 \r
783     /* Reference count must not be negative. */\r
784     IotMqtt_Assert( pMqttConnection->references >= 0 );\r
785 \r
786     /* Read connection status. */\r
787     disconnected = pMqttConnection->disconnected;\r
788 \r
789     /* Increment the connection's reference count if it is not disconnected. */\r
790     if( disconnected == false )\r
791     {\r
792         ( pMqttConnection->references )++;\r
793         IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
794                      pMqttConnection,\r
795                      ( long int ) pMqttConnection->references - 1,\r
796                      ( long int ) pMqttConnection->references );\r
797     }\r
798     else\r
799     {\r
800         IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection );\r
801     }\r
802 \r
803     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
804 \r
805     return( disconnected == false );\r
806 }\r
807 \r
808 /*-----------------------------------------------------------*/\r
809 \r
810 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection )\r
811 {\r
812     bool destroyConnection = false;\r
813 \r
814     /* Lock the mutex protecting the reference count. */\r
815     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
816 \r
817     /* Decrement reference count. It must not be negative. */\r
818     ( pMqttConnection->references )--;\r
819     IotMqtt_Assert( pMqttConnection->references >= 0 );\r
820 \r
821     IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.",\r
822                  pMqttConnection,\r
823                  ( long int ) pMqttConnection->references + 1,\r
824                  ( long int ) pMqttConnection->references );\r
825 \r
826     /* Check if this connection may be destroyed. */\r
827     if( pMqttConnection->references == 0 )\r
828     {\r
829         destroyConnection = true;\r
830     }\r
831     else\r
832     {\r
833         EMPTY_ELSE_MARKER;\r
834     }\r
835 \r
836     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
837 \r
838     /* Destroy an unreferenced MQTT connection. */\r
839     if( destroyConnection == true )\r
840     {\r
841         IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.",\r
842                      pMqttConnection );\r
843         _destroyMqttConnection( pMqttConnection );\r
844     }\r
845     else\r
846     {\r
847         EMPTY_ELSE_MARKER;\r
848     }\r
849 }\r
850 \r
851 /*-----------------------------------------------------------*/\r
852 \r
853 IotMqttError_t IotMqtt_Init( void )\r
854 {\r
855     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
856 \r
857     /* Call any additional serializer initialization function if serializer\r
858      * overrides are enabled. */\r
859     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
860         #ifdef _IotMqtt_InitSerializeAdditional\r
861             if( _IotMqtt_InitSerializeAdditional() == false )\r
862             {\r
863                 status = IOT_MQTT_INIT_FAILED;\r
864             }\r
865             else\r
866             {\r
867                 EMPTY_ELSE_MARKER;\r
868             }\r
869         #endif\r
870     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
871 \r
872     /* Log initialization status. */\r
873     if( status != IOT_MQTT_SUCCESS )\r
874     {\r
875         IotLogError( "Failed to initialize MQTT library serializer. " );\r
876     }\r
877     else\r
878     {\r
879         IotLogInfo( "MQTT library successfully initialized." );\r
880     }\r
881 \r
882     return status;\r
883 }\r
884 \r
885 /*-----------------------------------------------------------*/\r
886 \r
887 void IotMqtt_Cleanup( void )\r
888 {\r
889     /* Call any additional serializer cleanup initialization function if serializer\r
890      * overrides are enabled. */\r
891     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
892         #ifdef _IotMqtt_CleanupSerializeAdditional\r
893             _IotMqtt_CleanupSerializeAdditional();\r
894         #endif\r
895     #endif\r
896 \r
897     IotLogInfo( "MQTT library cleanup done." );\r
898 }\r
899 \r
900 /*-----------------------------------------------------------*/\r
901 \r
902 IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,\r
903                                 const IotMqttConnectInfo_t * pConnectInfo,\r
904                                 uint32_t timeoutMs,\r
905                                 IotMqttConnection_t * const pMqttConnection )\r
906 {\r
907     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
908     bool networkCreated = false, ownNetworkConnection = false;\r
909     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
910     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
911     void * pNetworkConnection = NULL;\r
912     _mqttOperation_t * pOperation = NULL;\r
913     _mqttConnection_t * pNewMqttConnection = NULL;\r
914 \r
915     /* Default CONNECT serializer function. */\r
916     IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,\r
917                                            uint8_t **,\r
918                                            size_t * ) = _IotMqtt_SerializeConnect;\r
919 \r
920     /* Network info must not be NULL. */\r
921     if( pNetworkInfo == NULL )\r
922     {\r
923         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
924     }\r
925     else\r
926     {\r
927         EMPTY_ELSE_MARKER;\r
928     }\r
929 \r
930     /* Validate network interface and connect info. */\r
931     if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )\r
932     {\r
933         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
934     }\r
935     else\r
936     {\r
937         EMPTY_ELSE_MARKER;\r
938     }\r
939 \r
940     /* If will info is provided, check that it is valid. */\r
941     if( pConnectInfo->pWillInfo != NULL )\r
942     {\r
943         if( _IotMqtt_ValidatePublish( pConnectInfo->awsIotMqttMode,\r
944                                       pConnectInfo->pWillInfo ) == false )\r
945         {\r
946             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
947         }\r
948         else if( pConnectInfo->pWillInfo->payloadLength > UINT16_MAX )\r
949         {\r
950             /* Will message payloads cannot be larger than 65535. This restriction\r
951              * applies only to will messages, and not normal PUBLISH messages. */\r
952             IotLogError( "Will payload cannot be larger than 65535." );\r
953 \r
954             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
955         }\r
956         else\r
957         {\r
958             EMPTY_ELSE_MARKER;\r
959         }\r
960     }\r
961     else\r
962     {\r
963         EMPTY_ELSE_MARKER;\r
964     }\r
965 \r
966     /* If previous subscriptions are provided, check that they are valid. */\r
967     if( pConnectInfo->cleanSession == false )\r
968     {\r
969         if( pConnectInfo->pPreviousSubscriptions != NULL )\r
970         {\r
971             if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,\r
972                                                    pConnectInfo->awsIotMqttMode,\r
973                                                    pConnectInfo->pPreviousSubscriptions,\r
974                                                    pConnectInfo->previousSubscriptionCount ) == false )\r
975             {\r
976                 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
977             }\r
978             else\r
979             {\r
980                 EMPTY_ELSE_MARKER;\r
981             }\r
982         }\r
983         else\r
984         {\r
985             EMPTY_ELSE_MARKER;\r
986         }\r
987     }\r
988     else\r
989     {\r
990         EMPTY_ELSE_MARKER;\r
991     }\r
992 \r
993     /* Create a new MQTT connection if requested. Otherwise, copy the existing\r
994      * network connection. */\r
995     if( pNetworkInfo->createNetworkConnection == true )\r
996     {\r
997         networkStatus = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo,\r
998                                                                  pNetworkInfo->u.setup.pNetworkCredentialInfo,\r
999                                                                  &pNetworkConnection );\r
1000 \r
1001         if( networkStatus == IOT_NETWORK_SUCCESS )\r
1002         {\r
1003             networkCreated = true;\r
1004 \r
1005             /* This MQTT connection owns the network connection it created and\r
1006              * should destroy it on cleanup. */\r
1007             ownNetworkConnection = true;\r
1008         }\r
1009         else\r
1010         {\r
1011             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
1012         }\r
1013     }\r
1014     else\r
1015     {\r
1016         pNetworkConnection = pNetworkInfo->u.pNetworkConnection;\r
1017         networkCreated = true;\r
1018     }\r
1019 \r
1020     IotLogInfo( "Establishing new MQTT connection." );\r
1021 \r
1022     /* Initialize a new MQTT connection object. */\r
1023     pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,\r
1024                                                 pNetworkInfo,\r
1025                                                 pConnectInfo->keepAliveSeconds );\r
1026 \r
1027     if( pNewMqttConnection == NULL )\r
1028     {\r
1029         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
1030     }\r
1031     else\r
1032     {\r
1033         /* Set the network connection associated with the MQTT connection. */\r
1034         pNewMqttConnection->pNetworkConnection = pNetworkConnection;\r
1035         pNewMqttConnection->ownNetworkConnection = ownNetworkConnection;\r
1036 \r
1037         /* Set the MQTT packet serializer overrides. */\r
1038         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1039             pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer;\r
1040         #endif\r
1041     }\r
1042 \r
1043     /* Set the MQTT receive callback. */\r
1044     networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection,\r
1045                                                                                IotMqtt_ReceiveCallback,\r
1046                                                                                pNewMqttConnection );\r
1047 \r
1048     if( networkStatus != IOT_NETWORK_SUCCESS )\r
1049     {\r
1050         IotLogError( "Failed to set MQTT network receive callback." );\r
1051 \r
1052         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
1053     }\r
1054     else\r
1055     {\r
1056         EMPTY_ELSE_MARKER;\r
1057     }\r
1058 \r
1059     /* Create a CONNECT operation. */\r
1060     status = _IotMqtt_CreateOperation( pNewMqttConnection,\r
1061                                        IOT_MQTT_FLAG_WAITABLE,\r
1062                                        NULL,\r
1063                                        &pOperation );\r
1064 \r
1065     if( status != IOT_MQTT_SUCCESS )\r
1066     {\r
1067         IOT_GOTO_CLEANUP();\r
1068     }\r
1069     else\r
1070     {\r
1071         EMPTY_ELSE_MARKER;\r
1072     }\r
1073 \r
1074     /* Ensure the members set by operation creation and serialization\r
1075      * are appropriate for a blocking CONNECT. */\r
1076     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1077     IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
1078                     == IOT_MQTT_FLAG_WAITABLE );\r
1079     IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
1080 \r
1081     /* Set the operation type. */\r
1082     pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
1083 \r
1084     /* Add previous session subscriptions. */\r
1085     if( pConnectInfo->pPreviousSubscriptions != NULL )\r
1086     {\r
1087         /* Previous subscription count should have been validated as nonzero. */\r
1088         IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 );\r
1089 \r
1090         status = _IotMqtt_AddSubscriptions( pNewMqttConnection,\r
1091                                             2,\r
1092                                             pConnectInfo->pPreviousSubscriptions,\r
1093                                             pConnectInfo->previousSubscriptionCount );\r
1094 \r
1095         if( status != IOT_MQTT_SUCCESS )\r
1096         {\r
1097             IOT_GOTO_CLEANUP();\r
1098         }\r
1099         else\r
1100         {\r
1101             EMPTY_ELSE_MARKER;\r
1102         }\r
1103     }\r
1104     else\r
1105     {\r
1106         EMPTY_ELSE_MARKER;\r
1107     }\r
1108 \r
1109     /* Choose a CONNECT serializer function. */\r
1110     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1111         if( pNewMqttConnection->pSerializer != NULL )\r
1112         {\r
1113             if( pNewMqttConnection->pSerializer->serialize.connect != NULL )\r
1114             {\r
1115                 serializeConnect = pNewMqttConnection->pSerializer->serialize.connect;\r
1116             }\r
1117             else\r
1118             {\r
1119                 EMPTY_ELSE_MARKER;\r
1120             }\r
1121         }\r
1122         else\r
1123         {\r
1124             EMPTY_ELSE_MARKER;\r
1125         }\r
1126     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1127 \r
1128     /* Convert the connect info and will info objects to an MQTT CONNECT packet. */\r
1129     status = serializeConnect( pConnectInfo,\r
1130                                &( pOperation->u.operation.pMqttPacket ),\r
1131                                &( pOperation->u.operation.packetSize ) );\r
1132 \r
1133     if( status != IOT_MQTT_SUCCESS )\r
1134     {\r
1135         IOT_GOTO_CLEANUP();\r
1136     }\r
1137     else\r
1138     {\r
1139         EMPTY_ELSE_MARKER;\r
1140     }\r
1141 \r
1142     /* Check the serialized MQTT packet. */\r
1143     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1144     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1145 \r
1146     /* Add the CONNECT operation to the send queue for network transmission. */\r
1147     status = _IotMqtt_ScheduleOperation( pOperation,\r
1148                                          _IotMqtt_ProcessSend,\r
1149                                          0 );\r
1150 \r
1151     if( status != IOT_MQTT_SUCCESS )\r
1152     {\r
1153         IotLogError( "Failed to enqueue CONNECT for sending." );\r
1154     }\r
1155     else\r
1156     {\r
1157         /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */\r
1158         status = IotMqtt_Wait( pOperation,\r
1159                                timeoutMs );\r
1160 \r
1161         /* The call to wait cleans up the CONNECT operation, so set the pointer\r
1162          * to NULL. */\r
1163         pOperation = NULL;\r
1164     }\r
1165 \r
1166     /* When a connection is successfully established, schedule keep-alive job. */\r
1167     if( status == IOT_MQTT_SUCCESS )\r
1168     {\r
1169         /* Check if a keep-alive job should be scheduled. */\r
1170         if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
1171         {\r
1172             IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
1173 \r
1174             taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
1175                                                            pNewMqttConnection->pingreq.job,\r
1176                                                            pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );\r
1177 \r
1178             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1179             {\r
1180                 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR );\r
1181             }\r
1182             else\r
1183             {\r
1184                 EMPTY_ELSE_MARKER;\r
1185             }\r
1186         }\r
1187         else\r
1188         {\r
1189             EMPTY_ELSE_MARKER;\r
1190         }\r
1191     }\r
1192     else\r
1193     {\r
1194         EMPTY_ELSE_MARKER;\r
1195     }\r
1196 \r
1197     IOT_FUNCTION_CLEANUP_BEGIN();\r
1198 \r
1199     if( status != IOT_MQTT_SUCCESS )\r
1200     {\r
1201         IotLogError( "Failed to establish new MQTT connection, error %s.",\r
1202                      IotMqtt_strerror( status ) );\r
1203 \r
1204         /* The network connection must be closed if it was created. */\r
1205         if( networkCreated == true )\r
1206         {\r
1207             networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection );\r
1208 \r
1209             if( networkStatus != IOT_NETWORK_SUCCESS )\r
1210             {\r
1211                 IotLogWarn( "Failed to close network connection." );\r
1212             }\r
1213             else\r
1214             {\r
1215                 IotLogInfo( "Network connection closed on error." );\r
1216             }\r
1217         }\r
1218         else\r
1219         {\r
1220             EMPTY_ELSE_MARKER;\r
1221         }\r
1222 \r
1223         if( pOperation != NULL )\r
1224         {\r
1225             _IotMqtt_DestroyOperation( pOperation );\r
1226         }\r
1227         else\r
1228         {\r
1229             EMPTY_ELSE_MARKER;\r
1230         }\r
1231 \r
1232         if( pNewMqttConnection != NULL )\r
1233         {\r
1234             _destroyMqttConnection( pNewMqttConnection );\r
1235         }\r
1236         else\r
1237         {\r
1238             EMPTY_ELSE_MARKER;\r
1239         }\r
1240     }\r
1241     else\r
1242     {\r
1243         IotLogInfo( "New MQTT connection %p established.", pMqttConnection );\r
1244 \r
1245         /* Set the output parameter. */\r
1246         *pMqttConnection = pNewMqttConnection;\r
1247     }\r
1248 \r
1249     IOT_FUNCTION_CLEANUP_END();\r
1250 }\r
1251 \r
1252 /*-----------------------------------------------------------*/\r
1253 \r
1254 void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,\r
1255                          uint32_t flags )\r
1256 {\r
1257     bool disconnected = false;\r
1258     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1259     _mqttOperation_t * pOperation = NULL;\r
1260 \r
1261     IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection );\r
1262 \r
1263     /* Read the connection status. */\r
1264     IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
1265     disconnected = mqttConnection->disconnected;\r
1266     IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
1267 \r
1268     /* Only send a DISCONNECT packet if the connection is active and the "cleanup only"\r
1269      * flag is not set. */\r
1270     if( disconnected == false )\r
1271     {\r
1272         if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == 0 )\r
1273         {\r
1274             /* Create a DISCONNECT operation. This function blocks until the DISCONNECT\r
1275              * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */\r
1276             status = _IotMqtt_CreateOperation( mqttConnection,\r
1277                                                IOT_MQTT_FLAG_WAITABLE,\r
1278                                                NULL,\r
1279                                                &pOperation );\r
1280 \r
1281             if( status == IOT_MQTT_SUCCESS )\r
1282             {\r
1283                 /* Ensure that the members set by operation creation and serialization\r
1284                  * are appropriate for a blocking DISCONNECT. */\r
1285                 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1286                 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
1287                                 == IOT_MQTT_FLAG_WAITABLE );\r
1288                 IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
1289 \r
1290                 /* Set the operation type. */\r
1291                 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
1292 \r
1293                 /* Choose a disconnect serializer. */\r
1294                 IotMqttError_t ( * serializeDisconnect )( uint8_t **,\r
1295                                                           size_t * ) = _IotMqtt_SerializeDisconnect;\r
1296 \r
1297                 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1298                     if( mqttConnection->pSerializer != NULL )\r
1299                     {\r
1300                         if( mqttConnection->pSerializer->serialize.disconnect != NULL )\r
1301                         {\r
1302                             serializeDisconnect = mqttConnection->pSerializer->serialize.disconnect;\r
1303                         }\r
1304                         else\r
1305                         {\r
1306                             EMPTY_ELSE_MARKER;\r
1307                         }\r
1308                     }\r
1309                     else\r
1310                     {\r
1311                         EMPTY_ELSE_MARKER;\r
1312                     }\r
1313                 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1314 \r
1315                 /* Generate a DISCONNECT packet. */\r
1316                 status = serializeDisconnect( &( pOperation->u.operation.pMqttPacket ),\r
1317                                               &( pOperation->u.operation.packetSize ) );\r
1318             }\r
1319             else\r
1320             {\r
1321                 EMPTY_ELSE_MARKER;\r
1322             }\r
1323 \r
1324             if( status == IOT_MQTT_SUCCESS )\r
1325             {\r
1326                 /* Check the serialized MQTT packet. */\r
1327                 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1328                 IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1329 \r
1330                 /* Schedule the DISCONNECT operation for network transmission. */\r
1331                 if( _IotMqtt_ScheduleOperation( pOperation,\r
1332                                                 _IotMqtt_ProcessSend,\r
1333                                                 0 ) != IOT_MQTT_SUCCESS )\r
1334                 {\r
1335                     IotLogWarn( "(MQTT connection %p) Failed to schedule DISCONNECT for sending.",\r
1336                                 mqttConnection );\r
1337                     _IotMqtt_DestroyOperation( pOperation );\r
1338                 }\r
1339                 else\r
1340                 {\r
1341                     /* Wait a short time for the DISCONNECT packet to be transmitted. */\r
1342                     status = IotMqtt_Wait( pOperation,\r
1343                                            IOT_MQTT_RESPONSE_WAIT_MS );\r
1344 \r
1345                     /* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT,\r
1346                      * or NETWORK ERROR. */\r
1347                     if( status == IOT_MQTT_SUCCESS )\r
1348                     {\r
1349                         IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection );\r
1350                     }\r
1351                     else\r
1352                     {\r
1353                         IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) ||\r
1354                                         ( status == IOT_MQTT_NETWORK_ERROR ) );\r
1355 \r
1356                         IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.",\r
1357                                     mqttConnection,\r
1358                                     IotMqtt_strerror( status ) );\r
1359                     }\r
1360                 }\r
1361             }\r
1362             else\r
1363             {\r
1364                 EMPTY_ELSE_MARKER;\r
1365             }\r
1366         }\r
1367         else\r
1368         {\r
1369             EMPTY_ELSE_MARKER;\r
1370         }\r
1371     }\r
1372     else\r
1373     {\r
1374         EMPTY_ELSE_MARKER;\r
1375     }\r
1376 \r
1377     /* Close the underlying network connection. This also cleans up keep-alive. */\r
1378     _IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED,\r
1379                                      mqttConnection );\r
1380 \r
1381     /* Check if the connection may be destroyed. */\r
1382     IotMutex_Lock( &( mqttConnection->referencesMutex ) );\r
1383 \r
1384     /* At this point, the connection should be marked disconnected. */\r
1385     IotMqtt_Assert( mqttConnection->disconnected == true );\r
1386 \r
1387     /* Attempt cancel and destroy each operation in the connection's lists. */\r
1388     IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ),\r
1389                              _mqttOperation_tryDestroy,\r
1390                              offsetof( _mqttOperation_t, link ) );\r
1391 \r
1392     IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ),\r
1393                              _mqttOperation_tryDestroy,\r
1394                              offsetof( _mqttOperation_t, link ) );\r
1395 \r
1396     IotMutex_Unlock( &( mqttConnection->referencesMutex ) );\r
1397 \r
1398     /* Decrement the connection reference count and destroy it if possible. */\r
1399     _IotMqtt_DecrementConnectionReferences( mqttConnection );\r
1400 }\r
1401 \r
1402 /*-----------------------------------------------------------*/\r
1403 \r
1404 IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,\r
1405                                   const IotMqttSubscription_t * pSubscriptionList,\r
1406                                   size_t subscriptionCount,\r
1407                                   uint32_t flags,\r
1408                                   const IotMqttCallbackInfo_t * pCallbackInfo,\r
1409                                   IotMqttOperation_t * const pSubscribeOperation )\r
1410 {\r
1411     return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
1412                                 mqttConnection,\r
1413                                 pSubscriptionList,\r
1414                                 subscriptionCount,\r
1415                                 flags,\r
1416                                 pCallbackInfo,\r
1417                                 pSubscribeOperation );\r
1418 }\r
1419 \r
1420 /*-----------------------------------------------------------*/\r
1421 \r
1422 IotMqttError_t IotMqtt_TimedSubscribe( IotMqttConnection_t mqttConnection,\r
1423                                        const IotMqttSubscription_t * pSubscriptionList,\r
1424                                        size_t subscriptionCount,\r
1425                                        uint32_t flags,\r
1426                                        uint32_t timeoutMs )\r
1427 {\r
1428     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1429     IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1430 \r
1431     /* Flags are not used, but the parameter is present for future compatibility. */\r
1432     ( void ) flags;\r
1433 \r
1434     /* Call the asynchronous SUBSCRIBE function. */\r
1435     status = IotMqtt_Subscribe( mqttConnection,\r
1436                                 pSubscriptionList,\r
1437                                 subscriptionCount,\r
1438                                 IOT_MQTT_FLAG_WAITABLE,\r
1439                                 NULL,\r
1440                                 &subscribeOperation );\r
1441 \r
1442     /* Wait for the SUBSCRIBE operation to complete. */\r
1443     if( status == IOT_MQTT_STATUS_PENDING )\r
1444     {\r
1445         status = IotMqtt_Wait( subscribeOperation, timeoutMs );\r
1446     }\r
1447     else\r
1448     {\r
1449         EMPTY_ELSE_MARKER;\r
1450     }\r
1451 \r
1452     /* Ensure that a status was set. */\r
1453     IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
1454 \r
1455     return status;\r
1456 }\r
1457 \r
1458 /*-----------------------------------------------------------*/\r
1459 \r
1460 IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,\r
1461                                     const IotMqttSubscription_t * pSubscriptionList,\r
1462                                     size_t subscriptionCount,\r
1463                                     uint32_t flags,\r
1464                                     const IotMqttCallbackInfo_t * pCallbackInfo,\r
1465                                     IotMqttOperation_t * const pUnsubscribeOperation )\r
1466 {\r
1467     return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
1468                                 mqttConnection,\r
1469                                 pSubscriptionList,\r
1470                                 subscriptionCount,\r
1471                                 flags,\r
1472                                 pCallbackInfo,\r
1473                                 pUnsubscribeOperation );\r
1474 }\r
1475 \r
1476 /*-----------------------------------------------------------*/\r
1477 \r
1478 IotMqttError_t IotMqtt_TimedUnsubscribe( IotMqttConnection_t mqttConnection,\r
1479                                          const IotMqttSubscription_t * pSubscriptionList,\r
1480                                          size_t subscriptionCount,\r
1481                                          uint32_t flags,\r
1482                                          uint32_t timeoutMs )\r
1483 {\r
1484     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1485     IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1486 \r
1487     /* Flags are not used, but the parameter is present for future compatibility. */\r
1488     ( void ) flags;\r
1489 \r
1490     /* Call the asynchronous UNSUBSCRIBE function. */\r
1491     status = IotMqtt_Unsubscribe( mqttConnection,\r
1492                                   pSubscriptionList,\r
1493                                   subscriptionCount,\r
1494                                   IOT_MQTT_FLAG_WAITABLE,\r
1495                                   NULL,\r
1496                                   &unsubscribeOperation );\r
1497 \r
1498     /* Wait for the UNSUBSCRIBE operation to complete. */\r
1499     if( status == IOT_MQTT_STATUS_PENDING )\r
1500     {\r
1501         status = IotMqtt_Wait( unsubscribeOperation, timeoutMs );\r
1502     }\r
1503     else\r
1504     {\r
1505         EMPTY_ELSE_MARKER;\r
1506     }\r
1507 \r
1508     /* Ensure that a status was set. */\r
1509     IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING );\r
1510 \r
1511     return status;\r
1512 }\r
1513 \r
1514 /*-----------------------------------------------------------*/\r
1515 \r
1516 IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,\r
1517                                 const IotMqttPublishInfo_t * pPublishInfo,\r
1518                                 uint32_t flags,\r
1519                                 const IotMqttCallbackInfo_t * pCallbackInfo,\r
1520                                 IotMqttOperation_t * const pPublishOperation )\r
1521 {\r
1522     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
1523     _mqttOperation_t * pOperation = NULL;\r
1524     uint8_t ** pPacketIdentifierHigh = NULL;\r
1525 \r
1526     /* Default PUBLISH serializer function. */\r
1527     IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,\r
1528                                            uint8_t **,\r
1529                                            size_t *,\r
1530                                            uint16_t *,\r
1531                                            uint8_t ** ) = _IotMqtt_SerializePublish;\r
1532 \r
1533     /* Check that the PUBLISH information is valid. */\r
1534     if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,\r
1535                                   pPublishInfo ) == false )\r
1536     {\r
1537         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1538     }\r
1539     else\r
1540     {\r
1541         EMPTY_ELSE_MARKER;\r
1542     }\r
1543 \r
1544     /* Check that no notification is requested for a QoS 0 publish. */\r
1545     if( pPublishInfo->qos == IOT_MQTT_QOS_0 )\r
1546     {\r
1547         if( pCallbackInfo != NULL )\r
1548         {\r
1549             IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
1550 \r
1551             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1552         }\r
1553         else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )\r
1554         {\r
1555             IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );\r
1556 \r
1557             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1558         }\r
1559         else\r
1560         {\r
1561             EMPTY_ELSE_MARKER;\r
1562         }\r
1563 \r
1564         if( pPublishOperation != NULL )\r
1565         {\r
1566             IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );\r
1567         }\r
1568         else\r
1569         {\r
1570             EMPTY_ELSE_MARKER;\r
1571         }\r
1572     }\r
1573     else\r
1574     {\r
1575         EMPTY_ELSE_MARKER;\r
1576     }\r
1577 \r
1578     /* Check that a reference pointer is provided for a waitable operation. */\r
1579     if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
1580     {\r
1581         if( pPublishOperation == NULL )\r
1582         {\r
1583             IotLogError( "Reference must be provided for a waitable PUBLISH." );\r
1584 \r
1585             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
1586         }\r
1587         else\r
1588         {\r
1589             EMPTY_ELSE_MARKER;\r
1590         }\r
1591     }\r
1592     else\r
1593     {\r
1594         EMPTY_ELSE_MARKER;\r
1595     }\r
1596 \r
1597     /* Create a PUBLISH operation. */\r
1598     status = _IotMqtt_CreateOperation( mqttConnection,\r
1599                                        flags,\r
1600                                        pCallbackInfo,\r
1601                                        &pOperation );\r
1602 \r
1603     if( status != IOT_MQTT_SUCCESS )\r
1604     {\r
1605         IOT_GOTO_CLEANUP();\r
1606     }\r
1607     else\r
1608     {\r
1609         EMPTY_ELSE_MARKER;\r
1610     }\r
1611 \r
1612     /* Check the PUBLISH operation data and set the operation type. */\r
1613     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
1614     pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;\r
1615 \r
1616     /* Choose a PUBLISH serializer function. */\r
1617     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
1618         if( mqttConnection->pSerializer != NULL )\r
1619         {\r
1620             if( mqttConnection->pSerializer->serialize.publish != NULL )\r
1621             {\r
1622                 serializePublish = mqttConnection->pSerializer->serialize.publish;\r
1623             }\r
1624             else\r
1625             {\r
1626                 EMPTY_ELSE_MARKER;\r
1627             }\r
1628         }\r
1629         else\r
1630         {\r
1631             EMPTY_ELSE_MARKER;\r
1632         }\r
1633     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
1634 \r
1635     /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */\r
1636     if( mqttConnection->awsIotMqttMode == true )\r
1637     {\r
1638         pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );\r
1639     }\r
1640     else\r
1641     {\r
1642         EMPTY_ELSE_MARKER;\r
1643     }\r
1644 \r
1645     /* Generate a PUBLISH packet from pPublishInfo. */\r
1646     status = serializePublish( pPublishInfo,\r
1647                                &( pOperation->u.operation.pMqttPacket ),\r
1648                                &( pOperation->u.operation.packetSize ),\r
1649                                &( pOperation->u.operation.packetIdentifier ),\r
1650                                pPacketIdentifierHigh );\r
1651 \r
1652     if( status != IOT_MQTT_SUCCESS )\r
1653     {\r
1654         IOT_GOTO_CLEANUP();\r
1655     }\r
1656     else\r
1657     {\r
1658         EMPTY_ELSE_MARKER;\r
1659     }\r
1660 \r
1661     /* Check the serialized MQTT packet. */\r
1662     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
1663     IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );\r
1664 \r
1665     /* Initialize PUBLISH retry if retryLimit is set. */\r
1666     if( pPublishInfo->retryLimit > 0 )\r
1667     {\r
1668         /* A QoS 0 PUBLISH may not be retried. */\r
1669         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1670         {\r
1671             pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;\r
1672             pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;\r
1673         }\r
1674         else\r
1675         {\r
1676             EMPTY_ELSE_MARKER;\r
1677         }\r
1678     }\r
1679     else\r
1680     {\r
1681         EMPTY_ELSE_MARKER;\r
1682     }\r
1683 \r
1684     /* Set the reference, if provided. */\r
1685     if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1686     {\r
1687         if( pPublishOperation != NULL )\r
1688         {\r
1689             *pPublishOperation = pOperation;\r
1690         }\r
1691         else\r
1692         {\r
1693             EMPTY_ELSE_MARKER;\r
1694         }\r
1695     }\r
1696     else\r
1697     {\r
1698         EMPTY_ELSE_MARKER;\r
1699     }\r
1700 \r
1701     /* Add the PUBLISH operation to the send queue for network transmission. */\r
1702     status = _IotMqtt_ScheduleOperation( pOperation,\r
1703                                          _IotMqtt_ProcessSend,\r
1704                                          0 );\r
1705 \r
1706     if( status != IOT_MQTT_SUCCESS )\r
1707     {\r
1708         IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",\r
1709                      mqttConnection );\r
1710 \r
1711         /* Clear the previously set (and now invalid) reference. */\r
1712         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
1713         {\r
1714             if( pPublishOperation != NULL )\r
1715             {\r
1716                 *pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;\r
1717             }\r
1718             else\r
1719             {\r
1720                 EMPTY_ELSE_MARKER;\r
1721             }\r
1722         }\r
1723         else\r
1724         {\r
1725             EMPTY_ELSE_MARKER;\r
1726         }\r
1727 \r
1728         IOT_GOTO_CLEANUP();\r
1729     }\r
1730     else\r
1731     {\r
1732         EMPTY_ELSE_MARKER;\r
1733     }\r
1734 \r
1735     /* Clean up the PUBLISH operation if this function fails. Otherwise, set the\r
1736      * appropriate return code based on QoS. */\r
1737     IOT_FUNCTION_CLEANUP_BEGIN();\r
1738 \r
1739     if( status != IOT_MQTT_SUCCESS )\r
1740     {\r
1741         if( pOperation != NULL )\r
1742         {\r
1743             _IotMqtt_DestroyOperation( pOperation );\r
1744         }\r
1745         else\r
1746         {\r
1747             EMPTY_ELSE_MARKER;\r
1748         }\r
1749     }\r
1750     else\r
1751     {\r
1752         if( pPublishInfo->qos > IOT_MQTT_QOS_0 )\r
1753         {\r
1754             status = IOT_MQTT_STATUS_PENDING;\r
1755         }\r
1756         else\r
1757         {\r
1758             EMPTY_ELSE_MARKER;\r
1759         }\r
1760 \r
1761         IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",\r
1762                     mqttConnection );\r
1763     }\r
1764 \r
1765     IOT_FUNCTION_CLEANUP_END();\r
1766 }\r
1767 \r
1768 /*-----------------------------------------------------------*/\r
1769 \r
1770 IotMqttError_t IotMqtt_TimedPublish( IotMqttConnection_t mqttConnection,\r
1771                                      const IotMqttPublishInfo_t * pPublishInfo,\r
1772                                      uint32_t flags,\r
1773                                      uint32_t timeoutMs )\r
1774 {\r
1775     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
1776     IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER,\r
1777                        * pPublishOperation = NULL;\r
1778 \r
1779     /* Clear the flags. */\r
1780     flags = 0;\r
1781 \r
1782     /* Set the waitable flag and reference for QoS 1 PUBLISH. */\r
1783     if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
1784     {\r
1785         flags = IOT_MQTT_FLAG_WAITABLE;\r
1786         pPublishOperation = &publishOperation;\r
1787     }\r
1788     else\r
1789     {\r
1790         EMPTY_ELSE_MARKER;\r
1791     }\r
1792 \r
1793     /* Call the asynchronous PUBLISH function. */\r
1794     status = IotMqtt_Publish( mqttConnection,\r
1795                               pPublishInfo,\r
1796                               flags,\r
1797                               NULL,\r
1798                               pPublishOperation );\r
1799 \r
1800     /* Wait for a queued QoS 1 PUBLISH to complete. */\r
1801     if( pPublishInfo->qos == IOT_MQTT_QOS_1 )\r
1802     {\r
1803         if( status == IOT_MQTT_STATUS_PENDING )\r
1804         {\r
1805             status = IotMqtt_Wait( publishOperation, timeoutMs );\r
1806         }\r
1807         else\r
1808         {\r
1809             EMPTY_ELSE_MARKER;\r
1810         }\r
1811     }\r
1812     else\r
1813     {\r
1814         EMPTY_ELSE_MARKER;\r
1815     }\r
1816 \r
1817     return status;\r
1818 }\r
1819 \r
1820 /*-----------------------------------------------------------*/\r
1821 \r
1822 IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,\r
1823                              uint32_t timeoutMs )\r
1824 {\r
1825     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
1826     _mqttConnection_t * pMqttConnection = operation->pMqttConnection;\r
1827 \r
1828     /* Validate the given operation reference. */\r
1829     if( _IotMqtt_ValidateOperation( operation ) == false )\r
1830     {\r
1831         status = IOT_MQTT_BAD_PARAMETER;\r
1832     }\r
1833     else\r
1834     {\r
1835         EMPTY_ELSE_MARKER;\r
1836     }\r
1837 \r
1838     /* Check the MQTT connection status. */\r
1839     if( status == IOT_MQTT_SUCCESS )\r
1840     {\r
1841         IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1842 \r
1843         if( pMqttConnection->disconnected == true )\r
1844         {\r
1845             IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. "\r
1846                          "Operation cannot be waited on.",\r
1847                          pMqttConnection,\r
1848                          IotMqtt_OperationType( operation->u.operation.type ),\r
1849                          operation );\r
1850 \r
1851             status = IOT_MQTT_NETWORK_ERROR;\r
1852         }\r
1853         else\r
1854         {\r
1855             IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.",\r
1856                         pMqttConnection,\r
1857                         IotMqtt_OperationType( operation->u.operation.type ),\r
1858                         operation );\r
1859         }\r
1860 \r
1861         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1862 \r
1863         /* Only wait on an operation if the MQTT connection is active. */\r
1864         if( status == IOT_MQTT_SUCCESS )\r
1865         {\r
1866             if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ),\r
1867                                         timeoutMs ) == false )\r
1868             {\r
1869                 status = IOT_MQTT_TIMEOUT;\r
1870 \r
1871                 /* Attempt to cancel the job of the timed out operation. */\r
1872                 ( void ) _IotMqtt_DecrementOperationReferences( operation, true );\r
1873 \r
1874                 /* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */\r
1875                 if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
1876                 {\r
1877                     IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up"\r
1878                                  " subscriptions of timed-out SUBSCRIBE.",\r
1879                                  pMqttConnection,\r
1880                                  operation );\r
1881 \r
1882                     _IotMqtt_RemoveSubscriptionByPacket( pMqttConnection,\r
1883                                                          operation->u.operation.packetIdentifier,\r
1884                                                          -1 );\r
1885                 }\r
1886                 else\r
1887                 {\r
1888                     EMPTY_ELSE_MARKER;\r
1889                 }\r
1890             }\r
1891             else\r
1892             {\r
1893                 /* Retrieve the status of the completed operation. */\r
1894                 status = operation->u.operation.status;\r
1895             }\r
1896 \r
1897             IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.",\r
1898                         pMqttConnection,\r
1899                         IotMqtt_OperationType( operation->u.operation.type ),\r
1900                         operation,\r
1901                         IotMqtt_strerror( status ) );\r
1902         }\r
1903         else\r
1904         {\r
1905             EMPTY_ELSE_MARKER;\r
1906         }\r
1907 \r
1908         /* Wait is finished; decrement operation reference count. */\r
1909         if( _IotMqtt_DecrementOperationReferences( operation, false ) == true )\r
1910         {\r
1911             _IotMqtt_DestroyOperation( operation );\r
1912         }\r
1913         else\r
1914         {\r
1915             EMPTY_ELSE_MARKER;\r
1916         }\r
1917     }\r
1918     else\r
1919     {\r
1920         EMPTY_ELSE_MARKER;\r
1921     }\r
1922 \r
1923     return status;\r
1924 }\r
1925 \r
1926 /*-----------------------------------------------------------*/\r
1927 \r
1928 const char * IotMqtt_strerror( IotMqttError_t status )\r
1929 {\r
1930     const char * pMessage = NULL;\r
1931 \r
1932     switch( status )\r
1933     {\r
1934         case IOT_MQTT_SUCCESS:\r
1935             pMessage = "SUCCESS";\r
1936             break;\r
1937 \r
1938         case IOT_MQTT_STATUS_PENDING:\r
1939             pMessage = "PENDING";\r
1940             break;\r
1941 \r
1942         case IOT_MQTT_INIT_FAILED:\r
1943             pMessage = "INITIALIZATION FAILED";\r
1944             break;\r
1945 \r
1946         case IOT_MQTT_BAD_PARAMETER:\r
1947             pMessage = "BAD PARAMETER";\r
1948             break;\r
1949 \r
1950         case IOT_MQTT_NO_MEMORY:\r
1951             pMessage = "NO MEMORY";\r
1952             break;\r
1953 \r
1954         case IOT_MQTT_NETWORK_ERROR:\r
1955             pMessage = "NETWORK ERROR";\r
1956             break;\r
1957 \r
1958         case IOT_MQTT_SCHEDULING_ERROR:\r
1959             pMessage = "SCHEDULING ERROR";\r
1960             break;\r
1961 \r
1962         case IOT_MQTT_BAD_RESPONSE:\r
1963             pMessage = "BAD RESPONSE RECEIVED";\r
1964             break;\r
1965 \r
1966         case IOT_MQTT_TIMEOUT:\r
1967             pMessage = "TIMEOUT";\r
1968             break;\r
1969 \r
1970         case IOT_MQTT_SERVER_REFUSED:\r
1971             pMessage = "SERVER REFUSED";\r
1972             break;\r
1973 \r
1974         case IOT_MQTT_RETRY_NO_RESPONSE:\r
1975             pMessage = "NO RESPONSE";\r
1976             break;\r
1977 \r
1978         default:\r
1979             pMessage = "INVALID STATUS";\r
1980             break;\r
1981     }\r
1982 \r
1983     return pMessage;\r
1984 }\r
1985 \r
1986 /*-----------------------------------------------------------*/\r
1987 \r
1988 const char * IotMqtt_OperationType( IotMqttOperationType_t operation )\r
1989 {\r
1990     const char * pMessage = NULL;\r
1991 \r
1992     switch( operation )\r
1993     {\r
1994         case IOT_MQTT_CONNECT:\r
1995             pMessage = "CONNECT";\r
1996             break;\r
1997 \r
1998         case IOT_MQTT_PUBLISH_TO_SERVER:\r
1999             pMessage = "PUBLISH";\r
2000             break;\r
2001 \r
2002         case IOT_MQTT_PUBACK:\r
2003             pMessage = "PUBACK";\r
2004             break;\r
2005 \r
2006         case IOT_MQTT_SUBSCRIBE:\r
2007             pMessage = "SUBSCRIBE";\r
2008             break;\r
2009 \r
2010         case IOT_MQTT_UNSUBSCRIBE:\r
2011             pMessage = "UNSUBSCRIBE";\r
2012             break;\r
2013 \r
2014         case IOT_MQTT_PINGREQ:\r
2015             pMessage = "PINGREQ";\r
2016             break;\r
2017 \r
2018         case IOT_MQTT_DISCONNECT:\r
2019             pMessage = "DISCONNECT";\r
2020             break;\r
2021 \r
2022         default:\r
2023             pMessage = "INVALID OPERATION";\r
2024             break;\r
2025     }\r
2026 \r
2027     return pMessage;\r
2028 }\r
2029 \r
2030 /*-----------------------------------------------------------*/\r
2031 \r
2032 /* Provide access to internal functions and variables if testing. */\r
2033 #if IOT_BUILD_TESTS == 1\r
2034     #include "iot_test_access_mqtt_api.c"\r
2035 #endif\r