]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
Correct an err in queue.c introduced when previously updating behaviour when queue...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_operation.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_operation.c\r
28  * @brief Implements functions that process MQTT operations.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <string.h>\r
36 \r
37 /* Error handling include. */\r
38 #include "private/iot_error.h"\r
39 \r
40 /* MQTT internal include. */\r
41 #include "private/iot_mqtt_internal.h"\r
42 \r
43 /* Platform layer includes. */\r
44 #include "platform/iot_clock.h"\r
45 #include "platform/iot_threads.h"\r
46 \r
47 /* Atomics include. */\r
48 #include "iot_atomic.h"\r
49 \r
50 /*-----------------------------------------------------------*/\r
51 \r
52 /**\r
53  * @brief First parameter to #_mqttOperation_match.\r
54  */\r
55 typedef struct _operationMatchParam\r
56 {\r
57     IotMqttOperationType_t type;        /**< @brief The type of operation to look for. */\r
58     const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.\r
59                                          * Set to `NULL` to ignore packet identifier. */\r
60 } _operationMatchParam_t;\r
61 \r
62 /*-----------------------------------------------------------*/\r
63 \r
64 /**\r
65  * @brief Match an MQTT operation by type and packet identifier.\r
66  *\r
67  * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.\r
68  * @param[in] pMatch Pointer to an #_operationMatchParam_t.\r
69  *\r
70  * @return `true` if the operation matches the parameters in `pArgument`; `false`\r
71  * otherwise.\r
72  */\r
73 static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
74                                   void * pMatch );\r
75 \r
76 /**\r
77  * @brief Check if an operation with retry has exceeded its retry limit.\r
78  *\r
79  * If a PUBLISH operation is available for retry, this function also sets any\r
80  * necessary DUP flags.\r
81  *\r
82  * @param[in] pOperation The operation to check.\r
83  *\r
84  * @return `true` if the operation may be retried; `false` otherwise.\r
85  */\r
86 static bool _checkRetryLimit( _mqttOperation_t * pOperation );\r
87 \r
88 /**\r
89  * @brief Schedule the next send of an operation with retry.\r
90  *\r
91  * @param[in] pOperation The operation to schedule.\r
92  *\r
93  * @return `true` if the reschedule succeeded; `false` otherwise.\r
94  */\r
95 static bool _scheduleNextRetry( _mqttOperation_t * pOperation );\r
96 \r
97 /*-----------------------------------------------------------*/\r
98 \r
99 static bool _mqttOperation_match( const IotLink_t * pOperationLink,\r
100                                   void * pMatch )\r
101 {\r
102     bool match = false;\r
103 \r
104     /* Because this function is called from a container function, the given link\r
105      * must never be NULL. */\r
106     IotMqtt_Assert( pOperationLink != NULL );\r
107 \r
108     _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,\r
109                                                        pOperationLink,\r
110                                                        link );\r
111     _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;\r
112 \r
113     /* Check for matching operations. */\r
114     if( pParam->type == pOperation->u.operation.type )\r
115     {\r
116         /* Check for matching packet identifiers. */\r
117         if( pParam->pPacketIdentifier == NULL )\r
118         {\r
119             match = true;\r
120         }\r
121         else\r
122         {\r
123             match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );\r
124         }\r
125     }\r
126     else\r
127     {\r
128         EMPTY_ELSE_MARKER;\r
129     }\r
130 \r
131     return match;\r
132 }\r
133 \r
134 /*-----------------------------------------------------------*/\r
135 \r
136 static bool _checkRetryLimit( _mqttOperation_t * pOperation )\r
137 {\r
138     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
139     bool status = true, setDup = false;\r
140 \r
141     /* Choose a set DUP function. */\r
142     void ( * publishSetDup )( uint8_t *,\r
143                               uint8_t *,\r
144                               uint16_t * ) = _IotMqtt_PublishSetDup;\r
145 \r
146     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
147         if( pMqttConnection->pSerializer != NULL )\r
148         {\r
149             if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )\r
150             {\r
151                 publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;\r
152             }\r
153             else\r
154             {\r
155                 EMPTY_ELSE_MARKER;\r
156             }\r
157         }\r
158         else\r
159         {\r
160             EMPTY_ELSE_MARKER;\r
161         }\r
162     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
163 \r
164     /* Only PUBLISH may be retried. */\r
165     IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );\r
166 \r
167     /* Check if the retry limit is exhausted. */\r
168     if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )\r
169     {\r
170         /* The retry count may be at most one more than the retry limit, which\r
171          * accounts for the final check for a PUBACK. */\r
172         IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==\r
173                         pOperation->u.operation.periodic.retry.limit + 1 );\r
174 \r
175         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",\r
176                      pMqttConnection,\r
177                      pOperation,\r
178                      pOperation->u.operation.periodic.retry.limit );\r
179 \r
180         status = false;\r
181     }\r
182     else\r
183     {\r
184         if( pOperation->u.operation.periodic.retry.count == 1 )\r
185         {\r
186             /* The DUP flag should always be set on the first retry. */\r
187             setDup = true;\r
188         }\r
189         else if( pMqttConnection->awsIotMqttMode == true )\r
190         {\r
191             /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet\r
192              * identifier) must be reset on every retry. */\r
193             setDup = true;\r
194         }\r
195         else\r
196         {\r
197             EMPTY_ELSE_MARKER;\r
198         }\r
199 \r
200         if( setDup == true )\r
201         {\r
202             /* In AWS IoT MQTT mode, the references mutex must be locked to\r
203              * prevent the packet identifier from being read while it is being\r
204              * changed. */\r
205             if( pMqttConnection->awsIotMqttMode == true )\r
206             {\r
207                 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
208             }\r
209             else\r
210             {\r
211                 EMPTY_ELSE_MARKER;\r
212             }\r
213 \r
214             /* Always set the DUP flag on the first retry. */\r
215             publishSetDup( pOperation->u.operation.pMqttPacket,\r
216                            pOperation->u.operation.pPacketIdentifierHigh,\r
217                            &( pOperation->u.operation.packetIdentifier ) );\r
218 \r
219             if( pMqttConnection->awsIotMqttMode == true )\r
220             {\r
221                 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
222             }\r
223             else\r
224             {\r
225                 EMPTY_ELSE_MARKER;\r
226             }\r
227         }\r
228         else\r
229         {\r
230             EMPTY_ELSE_MARKER;\r
231         }\r
232     }\r
233 \r
234     return status;\r
235 }\r
236 \r
237 /*-----------------------------------------------------------*/\r
238 \r
239 static bool _scheduleNextRetry( _mqttOperation_t * pOperation )\r
240 {\r
241     bool firstRetry = false;\r
242     uint32_t scheduleDelay = 0;\r
243     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
244     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
245 \r
246     /* This function should never be called with retry count greater than\r
247      * retry limit. */\r
248     IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=\r
249                     pOperation->u.operation.periodic.retry.limit );\r
250 \r
251     /* Increment the retry count. */\r
252     ( pOperation->u.operation.periodic.retry.count )++;\r
253 \r
254     /* Check for a response shortly for the final retry. Otherwise, calculate the\r
255      * next retry period. */\r
256     if( pOperation->u.operation.periodic.retry.count >\r
257         pOperation->u.operation.periodic.retry.limit )\r
258     {\r
259         scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;\r
260 \r
261         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "\r
262                      "for response in %d ms.",\r
263                      pMqttConnection,\r
264                      pOperation,\r
265                      IOT_MQTT_RESPONSE_WAIT_MS );\r
266     }\r
267     else\r
268     {\r
269         scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;\r
270 \r
271         /* Double the retry period, subject to a ceiling value. */\r
272         pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;\r
273 \r
274         if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )\r
275         {\r
276             pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;\r
277         }\r
278         else\r
279         {\r
280             EMPTY_ELSE_MARKER;\r
281         }\r
282 \r
283         IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",\r
284                      pMqttConnection,\r
285                      pOperation,\r
286                      ( unsigned long ) pOperation->u.operation.periodic.retry.count,\r
287                      ( unsigned long ) pOperation->u.operation.periodic.retry.limit,\r
288                      ( unsigned long ) scheduleDelay );\r
289 \r
290         /* Check if this is the first retry. */\r
291         firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );\r
292 \r
293         /* On the first retry, the PUBLISH will be moved from the pending processing\r
294          * list to the pending responses list. Lock the connection references mutex\r
295          * to manipulate the lists. */\r
296         if( firstRetry == true )\r
297         {\r
298             IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
299         }\r
300         else\r
301         {\r
302             EMPTY_ELSE_MARKER;\r
303         }\r
304     }\r
305 \r
306     /* Reschedule the PUBLISH for another send. */\r
307     status = _IotMqtt_ScheduleOperation( pOperation,\r
308                                          _IotMqtt_ProcessSend,\r
309                                          scheduleDelay );\r
310 \r
311     /* Check for successful reschedule. */\r
312     if( status == IOT_MQTT_SUCCESS )\r
313     {\r
314         /* Move a successfully rescheduled PUBLISH from the pending processing\r
315          * list to the pending responses list on the first retry. */\r
316         if( firstRetry == true )\r
317         {\r
318             /* Operation must be linked. */\r
319             IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );\r
320 \r
321             /* Transfer to pending response list. */\r
322             IotListDouble_Remove( &( pOperation->link ) );\r
323             IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
324                                       &( pOperation->link ) );\r
325         }\r
326         else\r
327         {\r
328             EMPTY_ELSE_MARKER;\r
329         }\r
330     }\r
331     else\r
332     {\r
333         EMPTY_ELSE_MARKER;\r
334     }\r
335 \r
336     /* The references mutex only needs to be unlocked on the first retry, since\r
337      * only the first retry manipulates the connection lists. */\r
338     if( firstRetry == true )\r
339     {\r
340         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
341     }\r
342     else\r
343     {\r
344         EMPTY_ELSE_MARKER;\r
345     }\r
346 \r
347     return( status == IOT_MQTT_SUCCESS );\r
348 }\r
349 \r
350 /*-----------------------------------------------------------*/\r
351 \r
352 IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,\r
353                                          uint32_t flags,\r
354                                          const IotMqttCallbackInfo_t * pCallbackInfo,\r
355                                          _mqttOperation_t ** pNewOperation )\r
356 {\r
357     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
358     bool decrementOnError = false;\r
359     _mqttOperation_t * pOperation = NULL;\r
360     bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );\r
361 \r
362     /* If the waitable flag is set, make sure that there's no callback. */\r
363     if( waitable == true )\r
364     {\r
365         if( pCallbackInfo != NULL )\r
366         {\r
367             IotLogError( "Callback should not be set for a waitable operation." );\r
368 \r
369             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );\r
370         }\r
371         else\r
372         {\r
373             EMPTY_ELSE_MARKER;\r
374         }\r
375     }\r
376     else\r
377     {\r
378         EMPTY_ELSE_MARKER;\r
379     }\r
380 \r
381     IotLogDebug( "(MQTT connection %p) Creating new operation record.",\r
382                  pMqttConnection );\r
383 \r
384     /* Increment the reference count for the MQTT connection when creating a new\r
385      * operation. */\r
386     if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )\r
387     {\r
388         IotLogError( "(MQTT connection %p) New operation record cannot be created"\r
389                      " for a closed connection",\r
390                      pMqttConnection );\r
391 \r
392         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );\r
393     }\r
394     else\r
395     {\r
396         /* Reference count will need to be decremented on error. */\r
397         decrementOnError = true;\r
398     }\r
399 \r
400     /* Allocate memory for a new operation. */\r
401     pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );\r
402 \r
403     if( pOperation == NULL )\r
404     {\r
405         IotLogError( "(MQTT connection %p) Failed to allocate memory for new "\r
406                      "operation record.",\r
407                      pMqttConnection );\r
408 \r
409         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
410     }\r
411     else\r
412     {\r
413         /* Clear the operation data. */\r
414         ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );\r
415 \r
416         /* Initialize some members of the new operation. */\r
417         pOperation->pMqttConnection = pMqttConnection;\r
418         pOperation->u.operation.jobReference = 1;\r
419         pOperation->u.operation.flags = flags;\r
420         pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;\r
421     }\r
422 \r
423     /* Check if the waitable flag is set. If it is, create a semaphore to\r
424      * wait on. */\r
425     if( waitable == true )\r
426     {\r
427         /* Create a semaphore to wait on for a waitable operation. */\r
428         if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )\r
429         {\r
430             IotLogError( "(MQTT connection %p) Failed to create semaphore for "\r
431                          "waitable operation.",\r
432                          pMqttConnection );\r
433 \r
434             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
435         }\r
436         else\r
437         {\r
438             /* A waitable operation is created with an additional reference for the\r
439              * Wait function. */\r
440             ( pOperation->u.operation.jobReference )++;\r
441         }\r
442     }\r
443     else\r
444     {\r
445         /* If the waitable flag isn't set but a callback is, copy the callback\r
446          * information. */\r
447         if( pCallbackInfo != NULL )\r
448         {\r
449             pOperation->u.operation.notify.callback = *pCallbackInfo;\r
450         }\r
451         else\r
452         {\r
453             EMPTY_ELSE_MARKER;\r
454         }\r
455     }\r
456 \r
457     /* Add this operation to the MQTT connection's operation list. */\r
458     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
459     IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
460                               &( pOperation->link ) );\r
461     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
462 \r
463     /* Set the output parameter. */\r
464     *pNewOperation = pOperation;\r
465 \r
466     /* Clean up operation and decrement reference count if this function failed. */\r
467     IOT_FUNCTION_CLEANUP_BEGIN();\r
468 \r
469     if( status != IOT_MQTT_SUCCESS )\r
470     {\r
471         if( decrementOnError == true )\r
472         {\r
473             _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
474         }\r
475         else\r
476         {\r
477             EMPTY_ELSE_MARKER;\r
478         }\r
479 \r
480         if( pOperation != NULL )\r
481         {\r
482             IotMqtt_FreeOperation( pOperation );\r
483         }\r
484         else\r
485         {\r
486             EMPTY_ELSE_MARKER;\r
487         }\r
488     }\r
489     else\r
490     {\r
491         EMPTY_ELSE_MARKER;\r
492     }\r
493 \r
494     IOT_FUNCTION_CLEANUP_END();\r
495 }\r
496 \r
497 /*-----------------------------------------------------------*/\r
498 \r
499 bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,\r
500                                             bool cancelJob )\r
501 {\r
502     bool destroyOperation = false;\r
503     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
504     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
505 \r
506     /* Attempt to cancel the operation's job. */\r
507     if( cancelJob == true )\r
508     {\r
509         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
510                                                 pOperation->job,\r
511                                                 NULL );\r
512 \r
513         /* If the operation's job was not canceled, it must be already executing.\r
514          * Any other return value is invalid. */\r
515         IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
516                         ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
517 \r
518         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
519         {\r
520             IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",\r
521                          pMqttConnection,\r
522                          IotMqtt_OperationType( pOperation->u.operation.type ),\r
523                          pOperation );\r
524         }\r
525         else\r
526         {\r
527             EMPTY_ELSE_MARKER;\r
528         }\r
529     }\r
530     else\r
531     {\r
532         EMPTY_ELSE_MARKER;\r
533     }\r
534 \r
535     /* Decrement job reference count. */\r
536     if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
537     {\r
538         IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
539         pOperation->u.operation.jobReference--;\r
540 \r
541         IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"\r
542                      " from %ld to %ld.",\r
543                      pMqttConnection,\r
544                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
545                      pOperation,\r
546                      ( long ) ( pOperation->u.operation.jobReference + 1 ),\r
547                      ( long ) ( pOperation->u.operation.jobReference ) );\r
548 \r
549         /* The job reference count must be 0 or 1 after the decrement. */\r
550         IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||\r
551                         ( pOperation->u.operation.jobReference == 1 ) );\r
552 \r
553         /* This operation may be destroyed if its reference count is 0. */\r
554         if( pOperation->u.operation.jobReference == 0 )\r
555         {\r
556             destroyOperation = true;\r
557         }\r
558         else\r
559         {\r
560             EMPTY_ELSE_MARKER;\r
561         }\r
562 \r
563         IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
564     }\r
565     else\r
566     {\r
567         EMPTY_ELSE_MARKER;\r
568     }\r
569 \r
570     return destroyOperation;\r
571 }\r
572 \r
573 /*-----------------------------------------------------------*/\r
574 \r
575 void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )\r
576 {\r
577     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
578 \r
579     /* Default free packet function. */\r
580     void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
581 \r
582     IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",\r
583                  pMqttConnection,\r
584                  IotMqtt_OperationType( pOperation->u.operation.type ),\r
585                  pOperation );\r
586 \r
587     /* The job reference count must be between 0 and 2. */\r
588     IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&\r
589                     ( pOperation->u.operation.jobReference <= 2 ) );\r
590 \r
591     /* Jobs to be destroyed should be removed from the MQTT connection's\r
592      * lists. */\r
593     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
594 \r
595     if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
596     {\r
597         IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",\r
598                      pMqttConnection,\r
599                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
600                      pOperation,\r
601                      pMqttConnection );\r
602 \r
603         IotListDouble_Remove( &( pOperation->link ) );\r
604     }\r
605     else\r
606     {\r
607         IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",\r
608                      pMqttConnection,\r
609                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
610                      pOperation );\r
611     }\r
612 \r
613     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
614 \r
615     /* Free any allocated MQTT packet. */\r
616     if( pOperation->u.operation.pMqttPacket != NULL )\r
617     {\r
618         #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
619             if( pMqttConnection->pSerializer != NULL )\r
620             {\r
621                 if( pMqttConnection->pSerializer->freePacket != NULL )\r
622                 {\r
623                     freePacket = pMqttConnection->pSerializer->freePacket;\r
624                 }\r
625                 else\r
626                 {\r
627                     EMPTY_ELSE_MARKER;\r
628                 }\r
629             }\r
630             else\r
631             {\r
632                 EMPTY_ELSE_MARKER;\r
633             }\r
634         #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
635 \r
636         freePacket( pOperation->u.operation.pMqttPacket );\r
637 \r
638         IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",\r
639                      pMqttConnection,\r
640                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
641                      pOperation );\r
642     }\r
643     else\r
644     {\r
645         IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",\r
646                      pMqttConnection,\r
647                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
648                      pOperation );\r
649     }\r
650 \r
651     /* Check if a wait semaphore was created for this operation. */\r
652     if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )\r
653     {\r
654         IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );\r
655 \r
656         IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",\r
657                      pMqttConnection,\r
658                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
659                      pOperation );\r
660     }\r
661     else\r
662     {\r
663         EMPTY_ELSE_MARKER;\r
664     }\r
665 \r
666     IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",\r
667                  pMqttConnection,\r
668                  IotMqtt_OperationType( pOperation->u.operation.type ),\r
669                  pOperation );\r
670 \r
671     /* Free the memory used to hold operation data. */\r
672     IotMqtt_FreeOperation( pOperation );\r
673 \r
674     /* Decrement the MQTT connection's reference count after destroying an\r
675      * operation. */\r
676     _IotMqtt_DecrementConnectionReferences( pMqttConnection );\r
677 }\r
678 \r
679 /*-----------------------------------------------------------*/\r
680 \r
681 void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,\r
682                                 IotTaskPoolJob_t pKeepAliveJob,\r
683                                 void * pContext )\r
684 {\r
685     bool status = true;\r
686     uint32_t swapStatus = 0;\r
687     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
688     size_t bytesSent = 0;\r
689 \r
690     /* Swap status is not checked when asserts are disabled. */\r
691     ( void ) swapStatus;\r
692 \r
693     /* Retrieve the MQTT connection from the context. */\r
694     _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;\r
695     _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );\r
696 \r
697     /* Check parameters. */\r
698     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
699     IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );\r
700 \r
701     /* Check that keep-alive interval is valid. The MQTT spec states its maximum\r
702      * value is 65,535 seconds. */\r
703     IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );\r
704 \r
705     /* Only two values are valid for the next keep alive job delay. */\r
706     IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==\r
707                       pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||\r
708                     ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs\r
709                       == IOT_MQTT_RESPONSE_WAIT_MS ) );\r
710 \r
711     IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );\r
712 \r
713     /* Re-create the keep-alive job for rescheduling. This should never fail. */\r
714     taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
715                                             pContext,\r
716                                             IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),\r
717                                             &pKeepAliveJob );\r
718     IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
719 \r
720     /* Determine whether to send a PINGREQ or check for PINGRESP. */\r
721     if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==\r
722         pPingreqOperation->u.operation.periodic.ping.keepAliveMs )\r
723     {\r
724         IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );\r
725 \r
726         /* Because PINGREQ may be used to keep the MQTT connection alive, it is\r
727          * more important than other operations. Bypass the queue of jobs for\r
728          * operations by directly sending the PINGREQ in this job. */\r
729         bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
730                                                               pPingreqOperation->u.operation.pMqttPacket,\r
731                                                               pPingreqOperation->u.operation.packetSize );\r
732 \r
733         if( bytesSent != pPingreqOperation->u.operation.packetSize )\r
734         {\r
735             IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );\r
736             status = false;\r
737         }\r
738         else\r
739         {\r
740             /* Assume the keep-alive will fail. The network receive callback will\r
741              * clear the failure flag upon receiving a PINGRESP. */\r
742             swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),\r
743                                                     1,\r
744                                                     0 );\r
745             IotMqtt_Assert( swapStatus == 1 );\r
746 \r
747             /* Schedule a check for PINGRESP. */\r
748             pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;\r
749 \r
750             IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",\r
751                          pMqttConnection,\r
752                          IOT_MQTT_RESPONSE_WAIT_MS );\r
753         }\r
754     }\r
755     else\r
756     {\r
757         IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );\r
758 \r
759         if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )\r
760         {\r
761             IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );\r
762 \r
763             /* PINGRESP was received. Schedule the next PINGREQ transmission. */\r
764             pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =\r
765                 pPingreqOperation->u.operation.periodic.ping.keepAliveMs;\r
766         }\r
767         else\r
768         {\r
769             IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",\r
770                          pMqttConnection,\r
771                          IOT_MQTT_RESPONSE_WAIT_MS );\r
772 \r
773             /* The network receive callback did not clear the failure flag. */\r
774             status = false;\r
775         }\r
776     }\r
777 \r
778     /* When a PINGREQ is successfully sent, reschedule this job to check for a\r
779      * response shortly. */\r
780     if( status == true )\r
781     {\r
782         taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,\r
783                                                        pKeepAliveJob,\r
784                                                        pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );\r
785 \r
786         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
787         {\r
788             IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",\r
789                          pMqttConnection,\r
790                          ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );\r
791         }\r
792         else\r
793         {\r
794             IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",\r
795                          pMqttConnection,\r
796                          IotTaskPool_strerror( taskPoolStatus ) );\r
797 \r
798             status = false;\r
799         }\r
800     }\r
801     else\r
802     {\r
803         EMPTY_ELSE_MARKER;\r
804     }\r
805 \r
806     /* Close the connection on failures. */\r
807     if( status == false )\r
808     {\r
809         _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,\r
810                                          pMqttConnection );\r
811     }\r
812     else\r
813     {\r
814         EMPTY_ELSE_MARKER;\r
815     }\r
816 }\r
817 \r
818 /*-----------------------------------------------------------*/\r
819 \r
820 void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,\r
821                                       IotTaskPoolJob_t pPublishJob,\r
822                                       void * pContext )\r
823 {\r
824     _mqttOperation_t * pOperation = pContext;\r
825     IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };\r
826 \r
827     /* Check parameters. The task pool and job parameter is not used when asserts\r
828      * are disabled. */\r
829     ( void ) pTaskPool;\r
830     ( void ) pPublishJob;\r
831     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
832     IotMqtt_Assert( pOperation->incomingPublish == true );\r
833     IotMqtt_Assert( pPublishJob == pOperation->job );\r
834 \r
835     /* Remove the operation from the pending processing list. */\r
836     IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );\r
837 \r
838     if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
839     {\r
840         IotListDouble_Remove( &( pOperation->link ) );\r
841     }\r
842     else\r
843     {\r
844         /* This operation may have already been removed by cleanup of pending\r
845          * operations (called from Disconnect). In that case, do nothing here. */\r
846         EMPTY_ELSE_MARKER;\r
847     }\r
848 \r
849     IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );\r
850 \r
851     /* Process the current PUBLISH. */\r
852     callbackParam.u.message.info = pOperation->u.publish.publishInfo;\r
853 \r
854     _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,\r
855                                          &callbackParam );\r
856 \r
857     /* Free buffers associated with the current PUBLISH message. */\r
858     IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );\r
859     IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );\r
860 \r
861     /* Free the incoming PUBLISH operation. */\r
862     IotMqtt_FreeOperation( pOperation );\r
863 }\r
864 \r
865 /*-----------------------------------------------------------*/\r
866 \r
867 void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,\r
868                            IotTaskPoolJob_t pSendJob,\r
869                            void * pContext )\r
870 {\r
871     size_t bytesSent = 0;\r
872     bool destroyOperation = false, waitable = false, networkPending = false;\r
873     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
874     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
875 \r
876     /* Check parameters. The task pool and job parameter is not used when asserts\r
877      * are disabled. */\r
878     ( void ) pTaskPool;\r
879     ( void ) pSendJob;\r
880     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
881     IotMqtt_Assert( pSendJob == pOperation->job );\r
882 \r
883     /* The given operation must have an allocated packet and be waiting for a status. */\r
884     IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );\r
885     IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );\r
886     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
887 \r
888     /* Check if this operation is waitable. */\r
889     waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
890 \r
891     /* Check PUBLISH retry counts and limits. */\r
892     if( pOperation->u.operation.periodic.retry.limit > 0 )\r
893     {\r
894         if( _checkRetryLimit( pOperation ) == false )\r
895         {\r
896             pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;\r
897         }\r
898         else\r
899         {\r
900             EMPTY_ELSE_MARKER;\r
901         }\r
902     }\r
903     else\r
904     {\r
905         EMPTY_ELSE_MARKER;\r
906     }\r
907 \r
908     /* Send an operation that is waiting for a response. */\r
909     if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
910     {\r
911         IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",\r
912                      pMqttConnection,\r
913                      IotMqtt_OperationType( pOperation->u.operation.type ),\r
914                      pOperation );\r
915 \r
916         /* Transmit the MQTT packet from the operation over the network. */\r
917         bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
918                                                               pOperation->u.operation.pMqttPacket,\r
919                                                               pOperation->u.operation.packetSize );\r
920 \r
921         /* Check transmission status. */\r
922         if( bytesSent != pOperation->u.operation.packetSize )\r
923         {\r
924             pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;\r
925         }\r
926         else\r
927         {\r
928             /* DISCONNECT operations are considered successful upon successful\r
929              * transmission. In addition, non-waitable operations with no callback\r
930              * may also be considered successful. */\r
931             if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )\r
932             {\r
933                 /* DISCONNECT operations are always waitable. */\r
934                 IotMqtt_Assert( waitable == true );\r
935 \r
936                 pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
937             }\r
938             else if( waitable == false )\r
939             {\r
940                 if( pOperation->u.operation.notify.callback.function == NULL )\r
941                 {\r
942                     pOperation->u.operation.status = IOT_MQTT_SUCCESS;\r
943                 }\r
944                 else\r
945                 {\r
946                     EMPTY_ELSE_MARKER;\r
947                 }\r
948             }\r
949             else\r
950             {\r
951                 EMPTY_ELSE_MARKER;\r
952             }\r
953         }\r
954     }\r
955     else\r
956     {\r
957         EMPTY_ELSE_MARKER;\r
958     }\r
959 \r
960     /* Check if this operation requires further processing. */\r
961     if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )\r
962     {\r
963         /* Check if this operation should be scheduled for retransmission. */\r
964         if( pOperation->u.operation.periodic.retry.limit > 0 )\r
965         {\r
966             if( _scheduleNextRetry( pOperation ) == false )\r
967             {\r
968                 pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;\r
969             }\r
970             else\r
971             {\r
972                 /* A successfully scheduled PUBLISH retry is awaiting a response\r
973                  * from the network. */\r
974                 networkPending = true;\r
975             }\r
976         }\r
977         else\r
978         {\r
979             /* Decrement reference count to signal completion of send job. Check\r
980              * if the operation should be destroyed. */\r
981             if( waitable == true )\r
982             {\r
983                 destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );\r
984             }\r
985             else\r
986             {\r
987                 EMPTY_ELSE_MARKER;\r
988             }\r
989 \r
990             /* If the operation should not be destroyed, transfer it from the\r
991              * pending processing to the pending response list. */\r
992             if( destroyOperation == false )\r
993             {\r
994                 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
995 \r
996                 /* Operation must be linked. */\r
997                 IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );\r
998 \r
999                 /* Transfer to pending response list. */\r
1000                 IotListDouble_Remove( &( pOperation->link ) );\r
1001                 IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),\r
1002                                           &( pOperation->link ) );\r
1003 \r
1004                 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1005 \r
1006                 /* This operation is now awaiting a response from the network. */\r
1007                 networkPending = true;\r
1008             }\r
1009             else\r
1010             {\r
1011                 EMPTY_ELSE_MARKER;\r
1012             }\r
1013         }\r
1014     }\r
1015     else\r
1016     {\r
1017         EMPTY_ELSE_MARKER;\r
1018     }\r
1019 \r
1020     /* Destroy the operation or notify of completion if necessary. */\r
1021     if( destroyOperation == true )\r
1022     {\r
1023         _IotMqtt_DestroyOperation( pOperation );\r
1024     }\r
1025     else\r
1026     {\r
1027         /* Do not check the operation status if a network response is pending,\r
1028          * since a network response could modify the status. */\r
1029         if( networkPending == false )\r
1030         {\r
1031             /* Notify of operation completion if this job set a status. */\r
1032             if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )\r
1033             {\r
1034                 _IotMqtt_Notify( pOperation );\r
1035             }\r
1036             else\r
1037             {\r
1038                 EMPTY_ELSE_MARKER;\r
1039             }\r
1040         }\r
1041         else\r
1042         {\r
1043             EMPTY_ELSE_MARKER;\r
1044         }\r
1045     }\r
1046 }\r
1047 \r
1048 /*-----------------------------------------------------------*/\r
1049 \r
1050 void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,\r
1051                                          IotTaskPoolJob_t pOperationJob,\r
1052                                          void * pContext )\r
1053 {\r
1054     _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;\r
1055     IotMqttCallbackParam_t callbackParam = { 0 };\r
1056 \r
1057     /* Check parameters. The task pool and job parameter is not used when asserts\r
1058      * are disabled. */\r
1059     ( void ) pTaskPool;\r
1060     ( void ) pOperationJob;\r
1061     /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */\r
1062     IotMqtt_Assert( pOperationJob == pOperation->job );\r
1063 \r
1064     /* The operation's callback function and status must be set. */\r
1065     IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );\r
1066     IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );\r
1067 \r
1068     callbackParam.mqttConnection = pOperation->pMqttConnection;\r
1069     callbackParam.u.operation.type = pOperation->u.operation.type;\r
1070     callbackParam.u.operation.reference = pOperation;\r
1071     callbackParam.u.operation.result = pOperation->u.operation.status;\r
1072 \r
1073     /* Invoke the user callback function. */\r
1074     pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,\r
1075                                                       &callbackParam );\r
1076 \r
1077     /* Attempt to destroy the operation once the user callback returns. */\r
1078     if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
1079     {\r
1080         _IotMqtt_DestroyOperation( pOperation );\r
1081     }\r
1082     else\r
1083     {\r
1084         EMPTY_ELSE_MARKER;\r
1085     }\r
1086 }\r
1087 \r
1088 /*-----------------------------------------------------------*/\r
1089 \r
1090 IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,\r
1091                                            IotTaskPoolRoutine_t jobRoutine,\r
1092                                            uint32_t delay )\r
1093 {\r
1094     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
1095     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
1096 \r
1097     /* Check that job routine is valid. */\r
1098     IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||\r
1099                     ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||\r
1100                     ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );\r
1101 \r
1102     /* Creating a new job should never fail when parameters are valid. */\r
1103     taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,\r
1104                                             pOperation,\r
1105                                             &( pOperation->jobStorage ),\r
1106                                             &( pOperation->job ) );\r
1107     IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );\r
1108 \r
1109     /* Schedule the new job with a delay. */\r
1110     taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
1111                                                    pOperation->job,\r
1112                                                    delay );\r
1113 \r
1114     if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1115     {\r
1116         /* Scheduling a newly-created job should never be invalid or illegal. */\r
1117         IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );\r
1118         IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );\r
1119 \r
1120         IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",\r
1121                     pOperation->pMqttConnection,\r
1122                     IotMqtt_OperationType( pOperation->u.operation.type ),\r
1123                     pOperation,\r
1124                     IotTaskPool_strerror( taskPoolStatus ) );\r
1125 \r
1126         status = IOT_MQTT_SCHEDULING_ERROR;\r
1127     }\r
1128     else\r
1129     {\r
1130         EMPTY_ELSE_MARKER;\r
1131     }\r
1132 \r
1133     return status;\r
1134 }\r
1135 \r
1136 /*-----------------------------------------------------------*/\r
1137 \r
1138 _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,\r
1139                                            IotMqttOperationType_t type,\r
1140                                            const uint16_t * pPacketIdentifier )\r
1141 {\r
1142     bool waitable = false;\r
1143     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
1144     _mqttOperation_t * pResult = NULL;\r
1145     IotLink_t * pResultLink = NULL;\r
1146     _operationMatchParam_t param = { 0 };\r
1147 \r
1148     param.type = type;\r
1149     param.pPacketIdentifier = pPacketIdentifier;\r
1150 \r
1151     if( pPacketIdentifier != NULL )\r
1152     {\r
1153         IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "\r
1154                      "with packet identifier %hu.",\r
1155                      pMqttConnection,\r
1156                      IotMqtt_OperationType( type ),\r
1157                      *pPacketIdentifier );\r
1158     }\r
1159     else\r
1160     {\r
1161         IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",\r
1162                      pMqttConnection,\r
1163                      IotMqtt_OperationType( type ) );\r
1164     }\r
1165 \r
1166     /* Find and remove the first matching element in the list. */\r
1167     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1168     pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),\r
1169                                                 NULL,\r
1170                                                 _mqttOperation_match,\r
1171                                                 &param );\r
1172 \r
1173     /* Check if a match was found. */\r
1174     if( pResultLink != NULL )\r
1175     {\r
1176         /* Get operation pointer and check if it is waitable. */\r
1177         pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );\r
1178         waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
1179 \r
1180         /* Check if the matched operation is a PUBLISH with retry. If it is, cancel\r
1181          * the retry job. */\r
1182         if( pResult->u.operation.periodic.retry.limit > 0 )\r
1183         {\r
1184             taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
1185                                                     pResult->job,\r
1186                                                     NULL );\r
1187 \r
1188             /* If the retry job could not be canceled, then it is currently\r
1189              * executing. Ignore the operation. */\r
1190             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
1191             {\r
1192                 pResult = NULL;\r
1193             }\r
1194             else\r
1195             {\r
1196                 /* Check job reference counts. A waitable operation should have a\r
1197                  * count of 2; a non-waitable operation should have a count of 1. */\r
1198                 IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );\r
1199             }\r
1200         }\r
1201         else\r
1202         {\r
1203             /* An operation with no retry in the pending responses list should\r
1204              * always have a job reference of 1. */\r
1205             IotMqtt_Assert( pResult->u.operation.jobReference == 1 );\r
1206 \r
1207             /* Increment job references of a waitable operation to prevent Wait from\r
1208              * destroying this operation if it times out. */\r
1209             if( waitable == true )\r
1210             {\r
1211                 ( pResult->u.operation.jobReference )++;\r
1212 \r
1213                 IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",\r
1214                              pMqttConnection,\r
1215                              IotMqtt_OperationType( type ),\r
1216                              pResult,\r
1217                              ( long int ) ( pResult->u.operation.jobReference - 1 ),\r
1218                              ( long int ) ( pResult->u.operation.jobReference ) );\r
1219             }\r
1220         }\r
1221     }\r
1222     else\r
1223     {\r
1224         EMPTY_ELSE_MARKER;\r
1225     }\r
1226 \r
1227     if( pResult != NULL )\r
1228     {\r
1229         IotLogDebug( "(MQTT connection %p) Found operation %s.",\r
1230                      pMqttConnection,\r
1231                      IotMqtt_OperationType( type ) );\r
1232 \r
1233         /* Remove the matched operation from the list. */\r
1234         IotListDouble_Remove( &( pResult->link ) );\r
1235     }\r
1236     else\r
1237     {\r
1238         IotLogDebug( "(MQTT connection %p) Operation %s not found.",\r
1239                      pMqttConnection,\r
1240                      IotMqtt_OperationType( type ) );\r
1241     }\r
1242 \r
1243     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1244 \r
1245     return pResult;\r
1246 }\r
1247 \r
1248 /*-----------------------------------------------------------*/\r
1249 \r
1250 void _IotMqtt_Notify( _mqttOperation_t * pOperation )\r
1251 {\r
1252     IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;\r
1253     _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;\r
1254 \r
1255     /* Check if operation is waitable. */\r
1256     bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;\r
1257 \r
1258     /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected\r
1259      * subscriptions are removed by the deserializer, so not removed here. */\r
1260     if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )\r
1261     {\r
1262         switch( pOperation->u.operation.status )\r
1263         {\r
1264             case IOT_MQTT_SUCCESS:\r
1265                 break;\r
1266 \r
1267             case IOT_MQTT_SERVER_REFUSED:\r
1268                 break;\r
1269 \r
1270             default:\r
1271                 _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,\r
1272                                                      pOperation->u.operation.packetIdentifier,\r
1273                                                      -1 );\r
1274                 break;\r
1275         }\r
1276     }\r
1277     else\r
1278     {\r
1279         EMPTY_ELSE_MARKER;\r
1280     }\r
1281 \r
1282     /* Schedule callback invocation for non-waitable operation. */\r
1283     if( waitable == false )\r
1284     {\r
1285         /* Non-waitable operation should have job reference of 1. */\r
1286         IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );\r
1287 \r
1288         /* Schedule an invocation of the callback. */\r
1289         if( pOperation->u.operation.notify.callback.function != NULL )\r
1290         {\r
1291             IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
1292 \r
1293             status = _IotMqtt_ScheduleOperation( pOperation,\r
1294                                                  _IotMqtt_ProcessCompletedOperation,\r
1295                                                  0 );\r
1296 \r
1297             if( status == IOT_MQTT_SUCCESS )\r
1298             {\r
1299                 IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",\r
1300                              pOperation->pMqttConnection,\r
1301                              IotMqtt_OperationType( pOperation->u.operation.type ),\r
1302                              pOperation );\r
1303 \r
1304                 /* Place the scheduled operation back in the list of operations pending\r
1305                  * processing. */\r
1306                 if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
1307                 {\r
1308                     IotListDouble_Remove( &( pOperation->link ) );\r
1309                 }\r
1310                 else\r
1311                 {\r
1312                     EMPTY_ELSE_MARKER;\r
1313                 }\r
1314 \r
1315                 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
1316                                           &( pOperation->link ) );\r
1317             }\r
1318             else\r
1319             {\r
1320                 IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",\r
1321                             pOperation->pMqttConnection,\r
1322                             IotMqtt_OperationType( pOperation->u.operation.type ),\r
1323                             pOperation );\r
1324             }\r
1325 \r
1326             IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
1327         }\r
1328         else\r
1329         {\r
1330             EMPTY_ELSE_MARKER;\r
1331         }\r
1332     }\r
1333     else\r
1334     {\r
1335         EMPTY_ELSE_MARKER;\r
1336     }\r
1337 \r
1338     /* Operations that weren't scheduled may be destroyed. */\r
1339     if( status == IOT_MQTT_SCHEDULING_ERROR )\r
1340     {\r
1341         /* Decrement reference count of operations not scheduled. */\r
1342         if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )\r
1343         {\r
1344             _IotMqtt_DestroyOperation( pOperation );\r
1345         }\r
1346         else\r
1347         {\r
1348             EMPTY_ELSE_MARKER;\r
1349         }\r
1350 \r
1351         /* Post to a waitable operation's semaphore. */\r
1352         if( waitable == true )\r
1353         {\r
1354             IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "\r
1355                          "notified of completion.",\r
1356                          pOperation->pMqttConnection,\r
1357                          IotMqtt_OperationType( pOperation->u.operation.type ),\r
1358                          pOperation );\r
1359 \r
1360             IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );\r
1361         }\r
1362         else\r
1363         {\r
1364             EMPTY_ELSE_MARKER;\r
1365         }\r
1366     }\r
1367     else\r
1368     {\r
1369         IotMqtt_Assert( status == IOT_MQTT_SUCCESS );\r
1370     }\r
1371 }\r
1372 \r
1373 /*-----------------------------------------------------------*/\r