]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
Remove IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ) from MQTT code
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_operation.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_operation.c\r
28  * @brief Implements functions that process MQTT operations.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <string.h>\r
36 \r
37 /* Error handling include. */\r
38 #include "private/iot_error.h"\r
39 \r
40 /* MQTT internal include. */\r
41 #include "private/iot_mqtt_internal.h"\r
42 \r
43 /* Platform layer includes. */\r
44 #include "platform/iot_clock.h"\r
45 #include "platform/iot_threads.h"\r
46 \r
47 /*-----------------------------------------------------------*/\r
48 \r
49 /**\r
50  * @brief First parameter to #_mqttOperation_match.\r
51  */\r
52 typedef struct _operationMatchParam\r
53 {\r
54     IotMqttOperationType_t type;        /**< @brief The type of operation to look for. */\r
55     const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.\r
56                                          * Set to `NULL` to ignore packet identifier. */\r
57 } _operationMatchParam_t;\r
58 \r
59 /*-----------------------------------------------------------*/\r
60 \r
61 /**\r
62  * @brief Match an MQTT operation by type and packet identifier.\r
63  *\r
64  * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.\r
65  * @param[in] pMatch Pointer to an #_operationMatchParam_t.\r
66  *\r
67  * @return `true` if the operation matches the parameters in `pArgument`; `false`\r
68  * otherwise.\r
69  */\r
70 static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
71                                   void * pMatch );\r
72 \r
73 /**\r
74  * @brief Check if an operation with retry has exceeded its retry limit.\r
75  *\r
76  * If a PUBLISH operation is available for retry, this function also sets any\r
77  * necessary DUP flags.\r
78  *\r
79  * @param[in] pOperation The operation to check.\r
80  *\r
81  * @return `true` if the operation may be retried; `false` otherwise.\r
82  */\r
83 static bool _checkRetryLimit( _mqttOperation_t * pOperation );\r
84 \r
85 /**\r
86  * @brief Schedule the next send of an operation with retry.\r
87  *\r
88  * @param[in] pOperation The operation to schedule.\r
89  *\r
90  * @return `true` if the reschedule succeeded; `false` otherwise.\r
91  */\r
92 static bool _scheduleNextRetry( _mqttOperation_t * pOperation );\r
93 \r
94 /*-----------------------------------------------------------*/\r
95 \r
96 static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
97                                   void * pMatch )\r
98 {\r
99     bool match = false;\r
100 \r
101     /* Because this function is called from a container function, the given link\r
102      * must never be NULL. */\r
103     IotMqtt_Assert( pOperationLink != NULL );\r
104 \r
105     _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,\r
106                                                        pOperationLink,\r
107                                                        link );\r
108     _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;\r
109 \r
110     /* Check for matching operations. */\r
111     if( pParam->type == pOperation->u.operation.type )\r
112     {\r
113         /* Check for matching packet identifiers. */\r
114         if( pParam->pPacketIdentifier == NULL )\r
115         {\r
116             match = true;\r
117         }\r
118         else\r
119         {\r
120             match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );\r
121         }\r
122     }\r
123     else\r
124     {\r
125         EMPTY_ELSE_MARKER;\r
126     }\r
127 \r
128     return match;\r
129 }\r
130 \r
131 /*-----------------------------------------------------------*/\r
132 \r
133 static bool _checkRetryLimit( _mqttOperation_t * pOperation )\r
134 {\r
135     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
136     bool status = true;\r
137 \r
138     /* Choose a set DUP function. */\r
139     void ( * publishSetDup )( uint8_t *,\r
140                               uint8_t *,\r
141                               uint16_t * ) = _IotMqtt_PublishSetDup;\r
142 \r
143     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
144         if( pMqttConnection->pSerializer != NULL )\r
145         {\r
146             if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )\r
147             {\r
148                 publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;\r
149             }\r
150             else\r
151             {\r
152                 EMPTY_ELSE_MARKER;\r
153             }\r
154         }\r
155         else\r
156         {\r
157             EMPTY_ELSE_MARKER;\r
158         }\r
159     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
160 \r
161     /* Only PUBLISH may be retried. */\r
162     IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );\r
163 \r
164     /* Check if the retry limit is exhausted. */\r
165     if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
166     {\r
167         /* The retry count may be at most one more than the retry limit, which\r
168          * accounts for the final check for a PUBACK. */\r
169         IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );\r
170 \r
171         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",\r
172                      pMqttConnection,\r
173                      pOperation,\r
174                      pOperation->u.operation.retry.limit );\r
175 \r
176         status = false;\r
177     }\r
178     /* Check if this is the first retry. */\r
179     else if( pOperation->u.operation.retry.count == 1 )\r
180     {\r
181         /* Always set the DUP flag on the first retry. */\r
182         publishSetDup( pOperation->u.operation.pMqttPacket,\r
183                        pOperation->u.operation.pPacketIdentifierHigh,\r
184                        &( pOperation->u.operation.packetIdentifier ) );\r
185     }\r
186     else\r
187     {\r
188         /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
189          * identifier) must be reset on every retry. */\r
190         if( pMqttConnection->awsIotMqttMode == true )\r
191         {\r
192             publishSetDup( pOperation->u.operation.pMqttPacket,\r
193                            pOperation->u.operation.pPacketIdentifierHigh,\r
194                            &( pOperation->u.operation.packetIdentifier ) );\r
195         }\r
196         else\r
197         {\r
198             EMPTY_ELSE_MARKER;\r
199         }\r
200     }\r
201 \r
202     return status;\r
203 }\r
204 \r
205 /*-----------------------------------------------------------*/\r
206 \r
207 static bool _scheduleNextRetry( _mqttOperation_t * pOperation )\r
208 {\r
209     bool firstRetry = false;\r
210     uint32_t scheduleDelay = 0;\r
211     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
212     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
213 \r
214     /* This function should never be called with retry count greater than\r
215      * retry limit. */\r
216     IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );\r
217 \r
218     /* Increment the retry count. */\r
219     ( pOperation->u.operation.retry.count )++;\r
220 \r
221     /* Check for a response shortly for the final retry. Otherwise, calculate the\r
222      * next retry period. */\r
223     if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )\r
224     {\r
225         scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;\r
226 \r
227         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "\r
228                      "for response in %d ms.",\r
229                      pMqttConnection,\r
230                      pOperation,\r
231                      IOT_MQTT_RESPONSE_WAIT_MS );\r
232     }\r
233     else\r
234     {\r
235         scheduleDelay = pOperation->u.operation.retry.nextPeriod;\r
236 \r
237         /* Double the retry period, subject to a ceiling value. */\r
238         pOperation->u.operation.retry.nextPeriod *= 2;\r
239 \r
240         if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )\r
241         {\r
242             pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;\r
243         }\r
244         else\r
245         {\r
246             EMPTY_ELSE_MARKER;\r
247         }\r
248 \r
249         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",\r
250                      pMqttConnection,\r
251                      pOperation,\r
252                      ( unsigned long ) pOperation->u.operation.retry.count,\r
253                      ( unsigned long ) pOperation->u.operation.retry.limit,\r
254                      ( unsigned long ) scheduleDelay );\r
255 \r
256         /* Check if this is the first retry. */\r
257         firstRetry = ( pOperation->u.operation.retry.count == 1 );\r
258 \r
259         /* On the first retry, the PUBLISH will be moved from the pending processing\r
260          * list to the pending responses list. Lock the connection references mutex\r
261          * to manipulate the lists. */\r
262         if( firstRetry == true )\r
263         {\r
264             IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
265         }\r
266         else\r
267         {\r
268             EMPTY_ELSE_MARKER;\r
269         }\r
270     }\r
271 \r
272     /* Reschedule the PUBLISH for another send. */\r
273     status = _IotMqtt_ScheduleOperation( pOperation,\r
274                                          _IotMqtt_ProcessSend,\r
275                                          scheduleDelay );\r
276 \r
277     /* Check for successful reschedule. */\r
278     if( status == IOT_MQTT_SUCCESS )\r
279     {\r
280         /* Move a successfully rescheduled PUBLISH from the pending processing\r
281          * list to the pending responses list on the first retry. */\r
282         if( firstRetry == true )\r
283         {\r
284             /* Operation must be linked. */\r
285             IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );\r
286 \r
287             /* Transfer to pending response list. */\r
288             IotListDouble_Remove( &( pOperation->link ) );\r
289             IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
290                                       &( pOperation->link ) );\r
291         }\r
292         else\r
293         {\r
294             EMPTY_ELSE_MARKER;\r
295         }\r
296     }\r
297     else\r
298     {\r
299         EMPTY_ELSE_MARKER;\r
300     }\r
301 \r
302     /* The references mutex only needs to be unlocked on the first retry, since\r
303      * only the first retry manipulates the connection lists. */\r
304     if( firstRetry == true )\r
305     {\r
306         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
307     }\r
308     else\r
309     {\r
310         EMPTY_ELSE_MARKER;\r
311     }\r
312 \r
313     return( status == IOT_MQTT_SUCCESS );\r
314 }\r
315 \r
316 /*-----------------------------------------------------------*/\r
317 \r
318 IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,\r
319                                          uint32_t flags,\r
320                                          const IotMqttCallbackInfo_t * pCallbackInfo,\r
321                                          _mqttOperation_t ** pNewOperation )\r
322 {\r
323     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
324     bool decrementOnError = false;\r
325     _mqttOperation_t * pOperation = NULL;\r
326     bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );\r
327 \r
328     /* If the waitable flag is set, make sure that there's no callback. */\r
329     if( waitable == true )\r
330     {\r
331         if( pCallbackInfo != NULL )\r
332         {\r
333             IotLogError( "Callback should not be set for a waitable operation." );\r
334 \r
335             return IOT_MQTT_BAD_PARAMETER;\r
336         }\r
337         else\r
338         {\r
339             EMPTY_ELSE_MARKER;\r
340         }\r
341     }\r
342     else\r
343     {\r
344         EMPTY_ELSE_MARKER;\r
345     }\r
346 \r
347     IotLogDebug( "(MQTT connection %p) Creating new operation record.",\r
348                  pMqttConnection );\r
349 \r
350     /* Increment the reference count for the MQTT connection when creating a new\r
351      * operation. */\r
352     if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )\r
353     {\r
354         IotLogError( "(MQTT connection %p) New operation record cannot be created"\r
355                      " for a closed connection",\r
356                      pMqttConnection );\r
357 \r
358         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
359     }\r
360     else\r
361     {\r
362         /* Reference count will need to be decremented on error. */\r
363         decrementOnError = true;\r
364     }\r
365 \r
366     /* Allocate memory for a new operation. */\r
367     pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );\r
368 \r
369     if( pOperation == NULL )\r
370     {\r
371         IotLogError( "(MQTT connection %p) Failed to allocate memory for new "\r
372                      "operation record.",\r
373                      pMqttConnection );\r
374 \r
375         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
376     }\r
377     else\r
378     {\r
379         /* Clear the operation data. */\r
380         ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );\r
381 \r
382         /* Initialize some members of the new operation. */\r
383         pOperation->pMqttConnection = pMqttConnection;\r
384         pOperation->u.operation.jobReference = 1;\r
385         pOperation->u.operation.flags = flags;\r
386         pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;\r
387     }\r
388 \r
389     /* Check if the waitable flag is set. If it is, create a semaphore to\r
390      * wait on. */\r
391     if( waitable == true )\r
392     {\r
393         /* Create a semaphore to wait on for a waitable operation. */\r
394         if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )\r
395         {\r
396             IotLogError( "(MQTT connection %p) Failed to create semaphore for "\r
397                          "waitable operation.",\r
398                          pMqttConnection );\r
399 \r
400             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
401         }\r
402         else\r
403         {\r
404             /* A waitable operation is created with an additional reference for the\r
405              * Wait function. */\r
406             ( pOperation->u.operation.jobReference )++;\r
407         }\r
408     }\r
409     else\r
410     {\r
411         /* If the waitable flag isn't set but a callback is, copy the callback\r
412          * information. */\r
413         if( pCallbackInfo != NULL )\r
414         {\r
415             pOperation->u.operation.notify.callback = *pCallbackInfo;\r
416         }\r
417         else\r
418         {\r
419             EMPTY_ELSE_MARKER;\r
420         }\r
421     }\r
422 \r
423     /* Add this operation to the MQTT connection's operation list. */\r
424     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
425     IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
426                               &( pOperation->link ) );\r
427     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
428 \r
429     /* Set the output parameter. */\r
430     *pNewOperation = pOperation;\r
431 \r
432     /* Clean up operation and decrement reference count if this function failed. */\r
433     IOT_FUNCTION_CLEANUP_BEGIN();\r
434 \r
435     if( status != IOT_MQTT_SUCCESS )\r
436     {\r
437         if( decrementOnError == true )\r
438         {\r
439             _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
440         }\r
441         else\r
442         {\r
443             EMPTY_ELSE_MARKER;\r
444         }\r
445 \r
446         if( pOperation != NULL )\r
447         {\r
448             IotMqtt_FreeOperation( pOperation );\r
449         }\r
450         else\r
451         {\r
452             EMPTY_ELSE_MARKER;\r
453         }\r
454     }\r
455     else\r
456     {\r
457         EMPTY_ELSE_MARKER;\r
458     }\r
459 \r
460     IOT_FUNCTION_CLEANUP_END();\r
461 }\r
462 \r
463 /*-----------------------------------------------------------*/\r
464 \r
465 bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,\r
466                                             bool cancelJob )\r
467 {\r
468     bool destroyOperation = false;\r
469     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
470     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
471 \r
472     /* Attempt to cancel the operation's job. */\r
473     if( cancelJob == true )\r
474     {\r
475         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
476                                                 pOperation->job,\r
477                                                 NULL );\r
478 \r
479         /* If the operation's job was not canceled, it must be already executing.\r
480          * Any other return value is invalid. */\r
481         IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
482                         ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
483 \r
484         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
485         {\r
486             IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",\r
487                          pMqttConnection,\r
488                          IotMqtt_OperationType( pOperation->u.operation.type ),\r
489                          pOperation );\r
490         }\r
491         else\r
492         {\r
493             EMPTY_ELSE_MARKER;\r
494         }\r
495     }\r
496     else\r
497     {\r
498         EMPTY_ELSE_MARKER;\r
499     }\r
500 \r
501     /* Decrement job reference count. */\r
502     if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
503     {\r
504         IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
505         pOperation->u.operation.jobReference--;\r
506 \r
507         IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"\r
508                      " from %ld to %ld.",\r
509                      pMqttConnection,\r
510                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
511                      pOperation,\r
512                      pOperation->u.operation.jobReference + 1,\r
513                      pOperation->u.operation.jobReference );\r
514 \r
515         /* The job reference count must be 0 or 1 after the decrement. */\r
516         IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||\r
517                         ( pOperation->u.operation.jobReference == 1 ) );\r
518 \r
519         /* This operation may be destroyed if its reference count is 0. */\r
520         if( pOperation->u.operation.jobReference == 0 )\r
521         {\r
522             destroyOperation = true;\r
523         }\r
524         else\r
525         {\r
526             EMPTY_ELSE_MARKER;\r
527         }\r
528 \r
529         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
530     }\r
531     else\r
532     {\r
533         EMPTY_ELSE_MARKER;\r
534     }\r
535 \r
536     return destroyOperation;\r
537 }\r
538 \r
539 /*-----------------------------------------------------------*/\r
540 \r
541 void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )\r
542 {\r
543     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
544 \r
545     /* Default free packet function. */\r
546     void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
547 \r
548     IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",\r
549                  pMqttConnection,\r
550                  IotMqtt_OperationType( pOperation->u.operation.type ),\r
551                  pOperation );\r
552 \r
553     /* The job reference count must be between 0 and 2. */\r
554     IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&\r
555                     ( pOperation->u.operation.jobReference <= 2 ) );\r
556 \r
557     /* Jobs to be destroyed should be removed from the MQTT connection's\r
558      * lists. */\r
559     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
560 \r
561     if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
562     {\r
563         IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",\r
564                      pMqttConnection,\r
565                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
566                      pOperation,\r
567                      pMqttConnection );\r
568 \r
569         IotListDouble_Remove( &( pOperation->link ) );\r
570     }\r
571     else\r
572     {\r
573         IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",\r
574                      pMqttConnection,\r
575                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
576                      pOperation );\r
577     }\r
578 \r
579     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
580 \r
581     /* Free any allocated MQTT packet. */\r
582     if( pOperation->u.operation.pMqttPacket != NULL )\r
583     {\r
584         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
585             if( pMqttConnection->pSerializer != NULL )\r
586             {\r
587                 if( pMqttConnection->pSerializer->freePacket != NULL )\r
588                 {\r
589                     freePacket = pMqttConnection->pSerializer->freePacket;\r
590                 }\r
591                 else\r
592                 {\r
593                     EMPTY_ELSE_MARKER;\r
594                 }\r
595             }\r
596             else\r
597             {\r
598                 EMPTY_ELSE_MARKER;\r
599             }\r
600         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
601 \r
602         freePacket( pOperation->u.operation.pMqttPacket );\r
603 \r
604         IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",\r
605                      pMqttConnection,\r
606                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
607                      pOperation );\r
608     }\r
609     else\r
610     {\r
611         IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",\r
612                      pMqttConnection,\r
613                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
614                      pOperation );\r
615     }\r
616 \r
617     /* Check if a wait semaphore was created for this operation. */\r
618     if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
619     {\r
620         IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );\r
621 \r
622         IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",\r
623                      pMqttConnection,\r
624                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
625                      pOperation );\r
626     }\r
627     else\r
628     {\r
629         EMPTY_ELSE_MARKER;\r
630     }\r
631 \r
632     IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",\r
633                  pMqttConnection,\r
634                  IotMqtt_OperationType( pOperation->u.operation.type ),\r
635                  pOperation );\r
636 \r
637     /* Free the memory used to hold operation data. */\r
638     IotMqtt_FreeOperation( pOperation );\r
639 \r
640     /* Decrement the MQTT connection's reference count after destroying an\r
641      * operation. */\r
642     _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
643 }\r
644 \r
645 /*-----------------------------------------------------------*/\r
646 \r
647 void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,\r
648                                 IotTaskPoolJob_t pKeepAliveJob,\r
649                                 void * pContext )\r
650 {\r
651     bool status = true;\r
652     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
653     size_t bytesSent = 0;\r
654 \r
655     /* Retrieve the MQTT connection from the context. */\r
656     _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;\r
657 \r
658     /* Check parameters. */\r
659     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
660     IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );\r
661 \r
662     /* Check that keep-alive interval is valid. The MQTT spec states its maximum\r
663      * value is 65,535 seconds. */\r
664     IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );\r
665 \r
666     /* Only two values are valid for the next keep alive job delay. */\r
667     IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||\r
668                     ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
669 \r
670     IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );\r
671 \r
672     /* Re-create the keep-alive job for rescheduling. This should never fail. */\r
673     taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
674                                             pContext,\r
675                                             IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),\r
676                                             &pKeepAliveJob );\r
677     IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
678 \r
679     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
680 \r
681     /* Determine whether to send a PINGREQ or check for PINGRESP. */\r
682     if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )\r
683     {\r
684         IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );\r
685 \r
686         /* Because PINGREQ may be used to keep the MQTT connection alive, it is\r
687          * more important than other operations. Bypass the queue of jobs for\r
688          * operations by directly sending the PINGREQ in this job. */\r
689         bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
690                                                               pMqttConnection->pPingreqPacket,\r
691                                                               pMqttConnection->pingreqPacketSize );\r
692 \r
693         if( bytesSent != pMqttConnection->pingreqPacketSize )\r
694         {\r
695             IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );\r
696             status = false;\r
697         }\r
698         else\r
699         {\r
700             /* Assume the keep-alive will fail. The network receive callback will\r
701              * clear the failure flag upon receiving a PINGRESP. */\r
702             pMqttConnection->keepAliveFailure = true;\r
703 \r
704             /* Schedule a check for PINGRESP. */\r
705             pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
706 \r
707             IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",\r
708                          pMqttConnection,\r
709                          IOT_MQTT_RESPONSE_WAIT_MS );\r
710         }\r
711     }\r
712     else\r
713     {\r
714         IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );\r
715 \r
716         if( pMqttConnection->keepAliveFailure == false )\r
717         {\r
718             IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );\r
719 \r
720             /* PINGRESP was received. Schedule the next PINGREQ transmission. */\r
721             pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
722         }\r
723         else\r
724         {\r
725             IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",\r
726                          pMqttConnection,\r
727                          IOT_MQTT_RESPONSE_WAIT_MS );\r
728 \r
729             /* The network receive callback did not clear the failure flag. */\r
730             status = false;\r
731         }\r
732     }\r
733 \r
734     /* When a PINGREQ is successfully sent, reschedule this job to check for a\r
735      * response shortly. */\r
736     if( status == true )\r
737     {\r
738         taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,\r
739                                                        pKeepAliveJob,\r
740                                                        pMqttConnection->nextKeepAliveMs );\r
741 \r
742         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
743         {\r
744             IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",\r
745                          pMqttConnection,\r
746                          IOT_MQTT_RESPONSE_WAIT_MS );\r
747         }\r
748         else\r
749         {\r
750             IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",\r
751                          pMqttConnection,\r
752                          IotTaskPool_strerror( taskPoolStatus ) );\r
753 \r
754             status = false;\r
755         }\r
756     }\r
757     else\r
758     {\r
759         EMPTY_ELSE_MARKER;\r
760     }\r
761 \r
762     /* Close the connection on failures. */\r
763     if( status == false )\r
764     {\r
765         _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,\r
766                                          pMqttConnection );\r
767     }\r
768     else\r
769     {\r
770         EMPTY_ELSE_MARKER;\r
771     }\r
772 \r
773     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
774 }\r
775 \r
776 /*-----------------------------------------------------------*/\r
777 \r
778 void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,\r
779                                       IotTaskPoolJob_t pPublishJob,\r
780                                       void * pContext )\r
781 {\r
782     _mqttOperation_t * pOperation = pContext;\r
783     IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };\r
784 \r
785     /* Check parameters. The task pool and job parameter is not used when asserts\r
786      * are disabled. */\r
787     ( void ) pTaskPool;\r
788     ( void ) pPublishJob;\r
789     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
790     IotMqtt_Assert( pOperation->incomingPublish == true );\r
791     IotMqtt_Assert( pPublishJob == pOperation->job );\r
792 \r
793     /* Remove the operation from the pending processing list. */\r
794     IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );\r
795 \r
796     if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
797     {\r
798         IotListDouble_Remove( &( pOperation->link ) );\r
799     }\r
800     else\r
801     {\r
802         EMPTY_ELSE_MARKER;\r
803     }\r
804 \r
805     IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );\r
806 \r
807     /* Process the current PUBLISH. */\r
808     callbackParam.u.message.info = pOperation->u.publish.publishInfo;\r
809 \r
810     _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,\r
811                                          &callbackParam );\r
812 \r
813     /* Free any buffers associated with the current PUBLISH message. */\r
814     if( pOperation->u.publish.pReceivedData != NULL )\r
815     {\r
816         IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
817     }\r
818     else\r
819     {\r
820         EMPTY_ELSE_MARKER;\r
821     }\r
822 \r
823     /* Free the incoming PUBLISH operation. */\r
824     IotMqtt_FreeOperation( pOperation );\r
825 }\r
826 \r
827 /*-----------------------------------------------------------*/\r
828 \r
829 void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,\r
830                            IotTaskPoolJob_t pSendJob,\r
831                            void * pContext )\r
832 {\r
833     size_t bytesSent = 0;\r
834     bool destroyOperation = false, waitable = false, networkPending = false;\r
835     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
836     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
837 \r
838     /* Check parameters. The task pool and job parameter is not used when asserts\r
839      * are disabled. */\r
840     ( void ) pTaskPool;\r
841     ( void ) pSendJob;\r
842     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
843     IotMqtt_Assert( pSendJob == pOperation->job );\r
844 \r
845     /* The given operation must have an allocated packet and be waiting for a status. */\r
846     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
847     IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );\r
848     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
849 \r
850     /* Check if this operation is waitable. */\r
851     waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
852 \r
853     /* Check PUBLISH retry counts and limits. */\r
854     if( pOperation->u.operation.retry.limit > 0 )\r
855     {\r
856         if( _checkRetryLimit( pOperation ) == false )\r
857         {\r
858             pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;\r
859         }\r
860         else\r
861         {\r
862             EMPTY_ELSE_MARKER;\r
863         }\r
864     }\r
865     else\r
866     {\r
867         EMPTY_ELSE_MARKER;\r
868     }\r
869 \r
870     /* Send an operation that is waiting for a response. */\r
871     if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
872     {\r
873         IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",\r
874                      pMqttConnection,\r
875                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
876                      pOperation );\r
877 \r
878         /* Transmit the MQTT packet from the operation over the network. */\r
879         bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
880                                                               pOperation->u.operation.pMqttPacket,\r
881                                                               pOperation->u.operation.packetSize );\r
882 \r
883         /* Check transmission status. */\r
884         if( bytesSent != pOperation->u.operation.packetSize )\r
885         {\r
886             pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;\r
887         }\r
888         else\r
889         {\r
890             /* DISCONNECT operations are considered successful upon successful\r
891              * transmission. In addition, non-waitable operations with no callback\r
892              * may also be considered successful. */\r
893             if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )\r
894             {\r
895                 /* DISCONNECT operations are always waitable. */\r
896                 IotMqtt_Assert( waitable == true );\r
897 \r
898                 pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
899             }\r
900             else if( waitable == false )\r
901             {\r
902                 if( pOperation->u.operation.notify.callback.function == NULL )\r
903                 {\r
904                     pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
905                 }\r
906                 else\r
907                 {\r
908                     EMPTY_ELSE_MARKER;\r
909                 }\r
910             }\r
911             else\r
912             {\r
913                 EMPTY_ELSE_MARKER;\r
914             }\r
915         }\r
916     }\r
917     else\r
918     {\r
919         EMPTY_ELSE_MARKER;\r
920     }\r
921 \r
922     /* Check if this operation requires further processing. */\r
923     if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
924     {\r
925         /* Check if this operation should be scheduled for retransmission. */\r
926         if( pOperation->u.operation.retry.limit > 0 )\r
927         {\r
928             if( _scheduleNextRetry( pOperation ) == false )\r
929             {\r
930                 pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;\r
931             }\r
932             else\r
933             {\r
934                 /* A successfully scheduled PUBLISH retry is awaiting a response\r
935                  * from the network. */\r
936                 networkPending = true;\r
937             }\r
938         }\r
939         else\r
940         {\r
941             /* Decrement reference count to signal completion of send job. Check\r
942              * if the operation should be destroyed. */\r
943             if( waitable == true )\r
944             {\r
945                 destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );\r
946             }\r
947             else\r
948             {\r
949                 EMPTY_ELSE_MARKER;\r
950             }\r
951 \r
952             /* If the operation should not be destroyed, transfer it from the\r
953              * pending processing to the pending response list. */\r
954             if( destroyOperation == false )\r
955             {\r
956                 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
957 \r
958                 /* Operation must be linked. */\r
959                 IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );\r
960 \r
961                 /* Transfer to pending response list. */\r
962                 IotListDouble_Remove( &( pOperation->link ) );\r
963                 IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
964                                           &( pOperation->link ) );\r
965 \r
966                 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
967 \r
968                 /* This operation is now awaiting a response from the network. */\r
969                 networkPending = true;\r
970             }\r
971             else\r
972             {\r
973                 EMPTY_ELSE_MARKER;\r
974             }\r
975         }\r
976     }\r
977     else\r
978     {\r
979         EMPTY_ELSE_MARKER;\r
980     }\r
981 \r
982     /* Destroy the operation or notify of completion if necessary. */\r
983     if( destroyOperation == true )\r
984     {\r
985         _IotMqtt_DestroyOperation( pOperation );\r
986     }\r
987     else\r
988     {\r
989         /* Do not check the operation status if a network response is pending,\r
990          * since a network response could modify the status. */\r
991         if( networkPending == false )\r
992         {\r
993             /* Notify of operation completion if this job set a status. */\r
994             if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )\r
995             {\r
996                 _IotMqtt_Notify( pOperation );\r
997             }\r
998             else\r
999             {\r
1000                 EMPTY_ELSE_MARKER;\r
1001             }\r
1002         }\r
1003         else\r
1004         {\r
1005             EMPTY_ELSE_MARKER;\r
1006         }\r
1007     }\r
1008 }\r
1009 \r
1010 /*-----------------------------------------------------------*/\r
1011 \r
1012 void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,\r
1013                                          IotTaskPoolJob_t pOperationJob,\r
1014                                          void * pContext )\r
1015 {\r
1016     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
1017     IotMqttCallbackParam_t callbackParam = { 0 };\r
1018 \r
1019     /* Check parameters. The task pool and job parameter is not used when asserts\r
1020      * are disabled. */\r
1021     ( void ) pTaskPool;\r
1022     ( void ) pOperationJob;\r
1023     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
1024     IotMqtt_Assert( pOperationJob == pOperation->job );\r
1025 \r
1026     /* The operation's callback function and status must be set. */\r
1027     IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );\r
1028     IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );\r
1029 \r
1030     callbackParam.mqttConnection = pOperation->pMqttConnection;\r
1031     callbackParam.u.operation.type = pOperation->u.operation.type;\r
1032     callbackParam.u.operation.reference = pOperation;\r
1033     callbackParam.u.operation.result = pOperation->u.operation.status;\r
1034 \r
1035     /* Invoke the user callback function. */\r
1036     pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,\r
1037                                                       &callbackParam );\r
1038 \r
1039     /* Attempt to destroy the operation once the user callback returns. */\r
1040     if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
1041     {\r
1042         _IotMqtt_DestroyOperation( pOperation );\r
1043     }\r
1044     else\r
1045     {\r
1046         EMPTY_ELSE_MARKER;\r
1047     }\r
1048 }\r
1049 \r
1050 /*-----------------------------------------------------------*/\r
1051 \r
1052 IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,\r
1053                                            IotTaskPoolRoutine_t jobRoutine,\r
1054                                            uint32_t delay )\r
1055 {\r
1056     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
1057     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
1058 \r
1059     /* Check that job routine is valid. */\r
1060     IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||\r
1061                     ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||\r
1062                     ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );\r
1063 \r
1064     /* Creating a new job should never fail when parameters are valid. */\r
1065     taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,\r
1066                                             pOperation,\r
1067                                             &( pOperation->jobStorage ),\r
1068                                             &( pOperation->job ) );\r
1069     IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
1070 \r
1071     /* Schedule the new job with a delay. */\r
1072     taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
1073                                                    pOperation->job,\r
1074                                                    delay );\r
1075 \r
1076     if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1077     {\r
1078         /* Scheduling a newly-created job should never be invalid or illegal. */\r
1079         IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );\r
1080         IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );\r
1081 \r
1082         IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",\r
1083                     pOperation->pMqttConnection,\r
1084                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
1085                     pOperation,\r
1086                     IotTaskPool_strerror( taskPoolStatus ) );\r
1087 \r
1088         status = IOT_MQTT_SCHEDULING_ERROR;\r
1089     }\r
1090     else\r
1091     {\r
1092         EMPTY_ELSE_MARKER;\r
1093     }\r
1094 \r
1095     return status;\r
1096 }\r
1097 \r
1098 /*-----------------------------------------------------------*/\r
1099 \r
1100 _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,\r
1101                                            IotMqttOperationType_t type,\r
1102                                            const uint16_t * pPacketIdentifier )\r
1103 {\r
1104     bool waitable = false;\r
1105     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
1106     _mqttOperation_t * pResult = NULL;\r
1107     IotLink_t * pResultLink = NULL;\r
1108     _operationMatchParam_t param = { .type = type, .pPacketIdentifier = pPacketIdentifier };\r
1109 \r
1110     if( pPacketIdentifier != NULL )\r
1111     {\r
1112         IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "\r
1113                      "with packet identifier %hu.",\r
1114                      pMqttConnection,\r
1115                      IotMqtt_OperationType( type ),\r
1116                      *pPacketIdentifier );\r
1117     }\r
1118     else\r
1119     {\r
1120         IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",\r
1121                      pMqttConnection,\r
1122                      IotMqtt_OperationType( type ) );\r
1123     }\r
1124 \r
1125     /* Find and remove the first matching element in the list. */\r
1126     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1127     pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),\r
1128                                                 NULL,\r
1129                                                 _mqttOperation_match,\r
1130                                                 &param );\r
1131 \r
1132     /* Check if a match was found. */\r
1133     if( pResultLink != NULL )\r
1134     {\r
1135         /* Get operation pointer and check if it is waitable. */\r
1136         pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );\r
1137         waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
1138 \r
1139         /* Check if the matched operation is a PUBLISH with retry. If it is, cancel\r
1140          * the retry job. */\r
1141         if( pResult->u.operation.retry.limit > 0 )\r
1142         {\r
1143             taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
1144                                                     pResult->job,\r
1145                                                     NULL );\r
1146 \r
1147             /* If the retry job could not be canceled, then it is currently\r
1148              * executing. Ignore the operation. */\r
1149             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1150             {\r
1151                 pResult = NULL;\r
1152             }\r
1153             else\r
1154             {\r
1155                 /* Check job reference counts. A waitable operation should have a\r
1156                  * count of 2; a non-waitable operation should have a count of 1. */\r
1157                 IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );\r
1158             }\r
1159         }\r
1160         else\r
1161         {\r
1162             /* An operation with no retry in the pending responses list should\r
1163              * always have a job reference of 1. */\r
1164             IotMqtt_Assert( pResult->u.operation.jobReference == 1 );\r
1165 \r
1166             /* Increment job references of a waitable operation to prevent Wait from\r
1167              * destroying this operation if it times out. */\r
1168             if( waitable == true )\r
1169             {\r
1170                 ( pResult->u.operation.jobReference )++;\r
1171 \r
1172                 IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",\r
1173                              pMqttConnection,\r
1174                              IotMqtt_OperationType( type ),\r
1175                              pResult,\r
1176                              ( long int ) ( pResult->u.operation.jobReference - 1 ),\r
1177                              ( long int ) ( pResult->u.operation.jobReference ) );\r
1178             }\r
1179         }\r
1180     }\r
1181     else\r
1182     {\r
1183         EMPTY_ELSE_MARKER;\r
1184     }\r
1185 \r
1186     if( pResult != NULL )\r
1187     {\r
1188         IotLogDebug( "(MQTT connection %p) Found operation %s." ,\r
1189                      pMqttConnection,\r
1190                      IotMqtt_OperationType( type ) );\r
1191 \r
1192         /* Remove the matched operation from the list. */\r
1193         IotListDouble_Remove( &( pResult->link ) );\r
1194     }\r
1195     else\r
1196     {\r
1197         IotLogDebug( "(MQTT connection %p) Operation %s not found.",\r
1198                      pMqttConnection,\r
1199                      IotMqtt_OperationType( type ) );\r
1200     }\r
1201 \r
1202     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1203 \r
1204     return pResult;\r
1205 }\r
1206 \r
1207 /*-----------------------------------------------------------*/\r
1208 \r
1209 void _IotMqtt_Notify( _mqttOperation_t * pOperation )\r
1210 {\r
1211     IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;\r
1212     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
1213 \r
1214     /* Check if operation is waitable. */\r
1215     bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
1216 \r
1217     /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected\r
1218      * subscriptions are removed by the deserializer, so not removed here. */\r
1219     if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
1220     {\r
1221         switch( pOperation->u.operation.status )\r
1222         {\r
1223             case IOT_MQTT_SUCCESS:\r
1224                 break;\r
1225 \r
1226             case IOT_MQTT_SERVER_REFUSED:\r
1227                 break;\r
1228 \r
1229             default:\r
1230                 _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,\r
1231                                                      pOperation->u.operation.packetIdentifier,\r
1232                                                      -1 );\r
1233                 break;\r
1234         }\r
1235     }\r
1236     else\r
1237     {\r
1238         EMPTY_ELSE_MARKER;\r
1239     }\r
1240 \r
1241     /* Schedule callback invocation for non-waitable operation. */\r
1242     if( waitable == false )\r
1243     {\r
1244         /* Non-waitable operation should have job reference of 1. */\r
1245         IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );\r
1246 \r
1247         /* Schedule an invocation of the callback. */\r
1248         if( pOperation->u.operation.notify.callback.function != NULL )\r
1249         {\r
1250             IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1251 \r
1252             status = _IotMqtt_ScheduleOperation( pOperation,\r
1253                                                  _IotMqtt_ProcessCompletedOperation,\r
1254                                                  0 );\r
1255 \r
1256             if( status == IOT_MQTT_SUCCESS )\r
1257             {\r
1258                 IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",\r
1259                              pOperation->pMqttConnection,\r
1260                              IotMqtt_OperationType( pOperation->u.operation.type ),\r
1261                              pOperation );\r
1262 \r
1263                 /* Place the scheduled operation back in the list of operations pending\r
1264                  * processing. */\r
1265                 if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
1266                 {\r
1267                     IotListDouble_Remove( &( pOperation->link ) );\r
1268                 }\r
1269                 else\r
1270                 {\r
1271                     EMPTY_ELSE_MARKER;\r
1272                 }\r
1273 \r
1274                 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
1275                                           &( pOperation->link ) );\r
1276             }\r
1277             else\r
1278             {\r
1279                 IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",\r
1280                             pOperation->pMqttConnection,\r
1281                             IotMqtt_OperationType( pOperation->u.operation.type ),\r
1282                             pOperation );\r
1283             }\r
1284 \r
1285             IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1286         }\r
1287         else\r
1288         {\r
1289             EMPTY_ELSE_MARKER;\r
1290         }\r
1291     }\r
1292     else\r
1293     {\r
1294         EMPTY_ELSE_MARKER;\r
1295     }\r
1296 \r
1297     /* Operations that weren't scheduled may be destroyed. */\r
1298     if( status == IOT_MQTT_SCHEDULING_ERROR )\r
1299     {\r
1300         /* Decrement reference count of operations not scheduled. */\r
1301         if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
1302         {\r
1303             _IotMqtt_DestroyOperation( pOperation );\r
1304         }\r
1305         else\r
1306         {\r
1307             EMPTY_ELSE_MARKER;\r
1308         }\r
1309 \r
1310         /* Post to a waitable operation's semaphore. */\r
1311         if( waitable == true )\r
1312         {\r
1313             IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "\r
1314                          "notified of completion.",\r
1315                          pOperation->pMqttConnection,\r
1316                          IotMqtt_OperationType( pOperation->u.operation.type ),\r
1317                          pOperation );\r
1318 \r
1319             IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );\r
1320         }\r
1321         else\r
1322         {\r
1323             EMPTY_ELSE_MARKER;\r
1324         }\r
1325     }\r
1326     else\r
1327     {\r
1328         IotMqtt_Assert( status == IOT_MQTT_SUCCESS );\r
1329     }\r
1330 }\r
1331 \r
1332 /*-----------------------------------------------------------*/\r