2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_operation.c
\r
28 * @brief Implements functions that process MQTT operations.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
37 /* Error handling include. */
\r
38 #include "private/iot_error.h"
\r
40 /* MQTT internal include. */
\r
41 #include "private/iot_mqtt_internal.h"
\r
43 /* Platform layer includes. */
\r
44 #include "platform/iot_clock.h"
\r
45 #include "platform/iot_threads.h"
\r
47 /* Atomics include. */
\r
48 #include "iot_atomic.h"
\r
50 /*-----------------------------------------------------------*/
\r
53 * @brief First parameter to #_mqttOperation_match.
\r
55 typedef struct _operationMatchParam
\r
57 IotMqttOperationType_t type; /**< @brief The type of operation to look for. */
\r
58 const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.
\r
59 * Set to `NULL` to ignore packet identifier. */
\r
60 } _operationMatchParam_t;
\r
62 /*-----------------------------------------------------------*/
\r
65 * @brief Match an MQTT operation by type and packet identifier.
\r
67 * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.
\r
68 * @param[in] pMatch Pointer to an #_operationMatchParam_t.
\r
70 * @return `true` if the operation matches the parameters in `pArgument`; `false`
\r
73 static bool _mqttOperation_match( const IotLink_t * pOperationLink,
\r
77 * @brief Check if an operation with retry has exceeded its retry limit.
\r
79 * If a PUBLISH operation is available for retry, this function also sets any
\r
80 * necessary DUP flags.
\r
82 * @param[in] pOperation The operation to check.
\r
84 * @return `true` if the operation may be retried; `false` otherwise.
\r
86 static bool _checkRetryLimit( _mqttOperation_t * pOperation );
\r
89 * @brief Schedule the next send of an operation with retry.
\r
91 * @param[in] pOperation The operation to schedule.
\r
93 * @return `true` if the reschedule succeeded; `false` otherwise.
\r
95 static bool _scheduleNextRetry( _mqttOperation_t * pOperation );
\r
97 /*-----------------------------------------------------------*/
\r
99 static bool _mqttOperation_match( const IotLink_t * pOperationLink,
\r
102 bool match = false;
\r
104 /* Because this function is called from a container function, the given link
\r
105 * must never be NULL. */
\r
106 IotMqtt_Assert( pOperationLink != NULL );
\r
108 _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,
\r
111 _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;
\r
113 /* Check for matching operations. */
\r
114 if( pParam->type == pOperation->u.operation.type )
\r
116 /* Check for matching packet identifiers. */
\r
117 if( pParam->pPacketIdentifier == NULL )
\r
123 match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );
\r
134 /*-----------------------------------------------------------*/
\r
136 static bool _checkRetryLimit( _mqttOperation_t * pOperation )
\r
138 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
139 bool status = true, setDup = false;
\r
141 /* Choose a set DUP function. */
\r
142 void ( * publishSetDup )( uint8_t *,
\r
144 uint16_t * ) = _IotMqtt_PublishSetDup;
\r
146 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
147 if( pMqttConnection->pSerializer != NULL )
\r
149 if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )
\r
151 publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;
\r
162 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
164 /* Only PUBLISH may be retried. */
\r
165 IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );
\r
167 /* Check if the retry limit is exhausted. */
\r
168 if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )
\r
170 /* The retry count may be at most one more than the retry limit, which
\r
171 * accounts for the final check for a PUBACK. */
\r
172 IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==
\r
173 pOperation->u.operation.periodic.retry.limit + 1 );
\r
175 IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",
\r
178 pOperation->u.operation.periodic.retry.limit );
\r
184 if( pOperation->u.operation.periodic.retry.count == 1 )
\r
186 /* The DUP flag should always be set on the first retry. */
\r
189 else if( pMqttConnection->awsIotMqttMode == true )
\r
191 /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
\r
192 * identifier) must be reset on every retry. */
\r
200 if( setDup == true )
\r
202 /* In AWS IoT MQTT mode, the references mutex must be locked to
\r
203 * prevent the packet identifier from being read while it is being
\r
205 if( pMqttConnection->awsIotMqttMode == true )
\r
207 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
214 /* Always set the DUP flag on the first retry. */
\r
215 publishSetDup( pOperation->u.operation.pMqttPacket,
\r
216 pOperation->u.operation.pPacketIdentifierHigh,
\r
217 &( pOperation->u.operation.packetIdentifier ) );
\r
219 if( pMqttConnection->awsIotMqttMode == true )
\r
221 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
237 /*-----------------------------------------------------------*/
\r
239 static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
\r
241 bool firstRetry = false;
\r
242 uint32_t scheduleDelay = 0;
\r
243 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
244 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
246 /* This function should never be called with retry count greater than
\r
248 IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=
\r
249 pOperation->u.operation.periodic.retry.limit );
\r
251 /* Increment the retry count. */
\r
252 ( pOperation->u.operation.periodic.retry.count )++;
\r
254 /* Check for a response shortly for the final retry. Otherwise, calculate the
\r
255 * next retry period. */
\r
256 if( pOperation->u.operation.periodic.retry.count >
\r
257 pOperation->u.operation.periodic.retry.limit )
\r
259 scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;
\r
261 IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "
\r
262 "for response in %d ms.",
\r
265 IOT_MQTT_RESPONSE_WAIT_MS );
\r
269 scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;
\r
271 /* Double the retry period, subject to a ceiling value. */
\r
272 pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;
\r
274 if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )
\r
276 pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;
\r
283 IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",
\r
286 ( unsigned long ) pOperation->u.operation.periodic.retry.count,
\r
287 ( unsigned long ) pOperation->u.operation.periodic.retry.limit,
\r
288 ( unsigned long ) scheduleDelay );
\r
290 /* Check if this is the first retry. */
\r
291 firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );
\r
293 /* On the first retry, the PUBLISH will be moved from the pending processing
\r
294 * list to the pending responses list. Lock the connection references mutex
\r
295 * to manipulate the lists. */
\r
296 if( firstRetry == true )
\r
298 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
306 /* Reschedule the PUBLISH for another send. */
\r
307 status = _IotMqtt_ScheduleOperation( pOperation,
\r
308 _IotMqtt_ProcessSend,
\r
311 /* Check for successful reschedule. */
\r
312 if( status == IOT_MQTT_SUCCESS )
\r
314 /* Move a successfully rescheduled PUBLISH from the pending processing
\r
315 * list to the pending responses list on the first retry. */
\r
316 if( firstRetry == true )
\r
318 /* Operation must be linked. */
\r
319 IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );
\r
321 /* Transfer to pending response list. */
\r
322 IotListDouble_Remove( &( pOperation->link ) );
\r
323 IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
\r
324 &( pOperation->link ) );
\r
336 /* The references mutex only needs to be unlocked on the first retry, since
\r
337 * only the first retry manipulates the connection lists. */
\r
338 if( firstRetry == true )
\r
340 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
347 return( status == IOT_MQTT_SUCCESS );
\r
350 /*-----------------------------------------------------------*/
\r
352 IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
\r
354 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
355 _mqttOperation_t ** pNewOperation )
\r
357 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
358 bool decrementOnError = false;
\r
359 _mqttOperation_t * pOperation = NULL;
\r
360 bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );
\r
362 /* If the waitable flag is set, make sure that there's no callback. */
\r
363 if( waitable == true )
\r
365 if( pCallbackInfo != NULL )
\r
367 IotLogError( "Callback should not be set for a waitable operation." );
\r
369 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
\r
381 IotLogDebug( "(MQTT connection %p) Creating new operation record.",
\r
384 /* Increment the reference count for the MQTT connection when creating a new
\r
386 if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )
\r
388 IotLogError( "(MQTT connection %p) New operation record cannot be created"
\r
389 " for a closed connection",
\r
392 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
396 /* Reference count will need to be decremented on error. */
\r
397 decrementOnError = true;
\r
400 /* Allocate memory for a new operation. */
\r
401 pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
\r
403 if( pOperation == NULL )
\r
405 IotLogError( "(MQTT connection %p) Failed to allocate memory for new "
\r
406 "operation record.",
\r
409 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
413 /* Clear the operation data. */
\r
414 ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
\r
416 /* Initialize some members of the new operation. */
\r
417 pOperation->pMqttConnection = pMqttConnection;
\r
418 pOperation->u.operation.jobReference = 1;
\r
419 pOperation->u.operation.flags = flags;
\r
420 pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;
\r
423 /* Check if the waitable flag is set. If it is, create a semaphore to
\r
425 if( waitable == true )
\r
427 /* Create a semaphore to wait on for a waitable operation. */
\r
428 if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )
\r
430 IotLogError( "(MQTT connection %p) Failed to create semaphore for "
\r
431 "waitable operation.",
\r
434 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
438 /* A waitable operation is created with an additional reference for the
\r
439 * Wait function. */
\r
440 ( pOperation->u.operation.jobReference )++;
\r
445 /* If the waitable flag isn't set but a callback is, copy the callback
\r
447 if( pCallbackInfo != NULL )
\r
449 pOperation->u.operation.notify.callback = *pCallbackInfo;
\r
457 /* Add this operation to the MQTT connection's operation list. */
\r
458 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
459 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
460 &( pOperation->link ) );
\r
461 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
463 /* Set the output parameter. */
\r
464 *pNewOperation = pOperation;
\r
466 /* Clean up operation and decrement reference count if this function failed. */
\r
467 IOT_FUNCTION_CLEANUP_BEGIN();
\r
469 if( status != IOT_MQTT_SUCCESS )
\r
471 if( decrementOnError == true )
\r
473 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
480 if( pOperation != NULL )
\r
482 IotMqtt_FreeOperation( pOperation );
\r
494 IOT_FUNCTION_CLEANUP_END();
\r
497 /*-----------------------------------------------------------*/
\r
499 bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
\r
502 bool destroyOperation = false;
\r
503 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
504 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
506 /* Attempt to cancel the operation's job. */
\r
507 if( cancelJob == true )
\r
509 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
513 /* If the operation's job was not canceled, it must be already executing.
\r
514 * Any other return value is invalid. */
\r
515 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
516 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
518 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
520 IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",
\r
522 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
535 /* Decrement job reference count. */
\r
536 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
538 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
539 pOperation->u.operation.jobReference--;
\r
541 IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"
\r
542 " from %ld to %ld.",
\r
544 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
546 ( long ) ( pOperation->u.operation.jobReference + 1 ),
\r
547 ( long ) ( pOperation->u.operation.jobReference ) );
\r
549 /* The job reference count must be 0 or 1 after the decrement. */
\r
550 IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||
\r
551 ( pOperation->u.operation.jobReference == 1 ) );
\r
553 /* This operation may be destroyed if its reference count is 0. */
\r
554 if( pOperation->u.operation.jobReference == 0 )
\r
556 destroyOperation = true;
\r
563 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
570 return destroyOperation;
\r
573 /*-----------------------------------------------------------*/
\r
575 void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )
\r
577 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
579 /* Default free packet function. */
\r
580 void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
\r
582 IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",
\r
584 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
587 /* The job reference count must be between 0 and 2. */
\r
588 IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&
\r
589 ( pOperation->u.operation.jobReference <= 2 ) );
\r
591 /* Jobs to be destroyed should be removed from the MQTT connection's
\r
593 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
595 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
597 IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",
\r
599 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
603 IotListDouble_Remove( &( pOperation->link ) );
\r
607 IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",
\r
609 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
613 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
615 /* Free any allocated MQTT packet. */
\r
616 if( pOperation->u.operation.pMqttPacket != NULL )
\r
618 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
619 if( pMqttConnection->pSerializer != NULL )
\r
621 if( pMqttConnection->pSerializer->freePacket != NULL )
\r
623 freePacket = pMqttConnection->pSerializer->freePacket;
\r
634 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
636 freePacket( pOperation->u.operation.pMqttPacket );
\r
638 IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",
\r
640 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
645 IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",
\r
647 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
651 /* Check if a wait semaphore was created for this operation. */
\r
652 if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
654 IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );
\r
656 IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",
\r
658 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
666 IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",
\r
668 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
671 /* Free the memory used to hold operation data. */
\r
672 IotMqtt_FreeOperation( pOperation );
\r
674 /* Decrement the MQTT connection's reference count after destroying an
\r
676 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
679 /*-----------------------------------------------------------*/
\r
681 void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
\r
682 IotTaskPoolJob_t pKeepAliveJob,
\r
685 bool status = true;
\r
686 uint32_t swapStatus = 0;
\r
687 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
688 size_t bytesSent = 0;
\r
690 /* Swap status is not checked when asserts are disabled. */
\r
691 ( void ) swapStatus;
\r
693 /* Retrieve the MQTT connection from the context. */
\r
694 _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;
\r
695 _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );
\r
697 /* Check parameters. */
\r
698 /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
\r
699 IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );
\r
701 /* Check that keep-alive interval is valid. The MQTT spec states its maximum
\r
702 * value is 65,535 seconds. */
\r
703 IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );
\r
705 /* Only two values are valid for the next keep alive job delay. */
\r
706 IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
\r
707 pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||
\r
708 ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs
\r
709 == IOT_MQTT_RESPONSE_WAIT_MS ) );
\r
711 IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );
\r
713 /* Re-create the keep-alive job for rescheduling. This should never fail. */
\r
714 taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
\r
716 IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),
\r
718 IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
\r
720 /* Determine whether to send a PINGREQ or check for PINGRESP. */
\r
721 if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
\r
722 pPingreqOperation->u.operation.periodic.ping.keepAliveMs )
\r
724 IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );
\r
726 /* Because PINGREQ may be used to keep the MQTT connection alive, it is
\r
727 * more important than other operations. Bypass the queue of jobs for
\r
728 * operations by directly sending the PINGREQ in this job. */
\r
729 bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
\r
730 pPingreqOperation->u.operation.pMqttPacket,
\r
731 pPingreqOperation->u.operation.packetSize );
\r
733 if( bytesSent != pPingreqOperation->u.operation.packetSize )
\r
735 IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );
\r
740 /* Assume the keep-alive will fail. The network receive callback will
\r
741 * clear the failure flag upon receiving a PINGRESP. */
\r
742 swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),
\r
745 IotMqtt_Assert( swapStatus == 1 );
\r
747 /* Schedule a check for PINGRESP. */
\r
748 pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;
\r
750 IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",
\r
752 IOT_MQTT_RESPONSE_WAIT_MS );
\r
757 IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );
\r
759 if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )
\r
761 IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );
\r
763 /* PINGRESP was received. Schedule the next PINGREQ transmission. */
\r
764 pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =
\r
765 pPingreqOperation->u.operation.periodic.ping.keepAliveMs;
\r
769 IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",
\r
771 IOT_MQTT_RESPONSE_WAIT_MS );
\r
773 /* The network receive callback did not clear the failure flag. */
\r
778 /* When a PINGREQ is successfully sent, reschedule this job to check for a
\r
779 * response shortly. */
\r
780 if( status == true )
\r
782 taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,
\r
784 pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
\r
786 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
788 IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",
\r
790 ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
\r
794 IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",
\r
796 IotTaskPool_strerror( taskPoolStatus ) );
\r
806 /* Close the connection on failures. */
\r
807 if( status == false )
\r
809 _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,
\r
818 /*-----------------------------------------------------------*/
\r
820 void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
\r
821 IotTaskPoolJob_t pPublishJob,
\r
824 _mqttOperation_t * pOperation = pContext;
\r
825 IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };
\r
827 /* Check parameters. The task pool and job parameter is not used when asserts
\r
829 ( void ) pTaskPool;
\r
830 ( void ) pPublishJob;
\r
831 /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
\r
832 IotMqtt_Assert( pOperation->incomingPublish == true );
\r
833 IotMqtt_Assert( pPublishJob == pOperation->job );
\r
835 /* Remove the operation from the pending processing list. */
\r
836 IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );
\r
838 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
840 IotListDouble_Remove( &( pOperation->link ) );
\r
844 /* This operation may have already been removed by cleanup of pending
\r
845 * operations (called from Disconnect). In that case, do nothing here. */
\r
849 IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );
\r
851 /* Process the current PUBLISH. */
\r
852 callbackParam.u.message.info = pOperation->u.publish.publishInfo;
\r
854 _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,
\r
857 /* Free buffers associated with the current PUBLISH message. */
\r
858 IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );
\r
859 IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
\r
861 /* Free the incoming PUBLISH operation. */
\r
862 IotMqtt_FreeOperation( pOperation );
\r
865 /*-----------------------------------------------------------*/
\r
867 void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
\r
868 IotTaskPoolJob_t pSendJob,
\r
871 size_t bytesSent = 0;
\r
872 bool destroyOperation = false, waitable = false, networkPending = false;
\r
873 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
\r
874 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
876 /* Check parameters. The task pool and job parameter is not used when asserts
\r
878 ( void ) pTaskPool;
\r
880 /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
\r
881 IotMqtt_Assert( pSendJob == pOperation->job );
\r
883 /* The given operation must have an allocated packet and be waiting for a status. */
\r
884 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
885 IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );
\r
886 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
888 /* Check if this operation is waitable. */
\r
889 waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
\r
891 /* Check PUBLISH retry counts and limits. */
\r
892 if( pOperation->u.operation.periodic.retry.limit > 0 )
\r
894 if( _checkRetryLimit( pOperation ) == false )
\r
896 pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;
\r
908 /* Send an operation that is waiting for a response. */
\r
909 if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
\r
911 IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",
\r
913 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
916 /* Transmit the MQTT packet from the operation over the network. */
\r
917 bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
\r
918 pOperation->u.operation.pMqttPacket,
\r
919 pOperation->u.operation.packetSize );
\r
921 /* Check transmission status. */
\r
922 if( bytesSent != pOperation->u.operation.packetSize )
\r
924 pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;
\r
928 /* DISCONNECT operations are considered successful upon successful
\r
929 * transmission. In addition, non-waitable operations with no callback
\r
930 * may also be considered successful. */
\r
931 if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )
\r
933 /* DISCONNECT operations are always waitable. */
\r
934 IotMqtt_Assert( waitable == true );
\r
936 pOperation->u.operation.status = IOT_MQTT_SUCCESS;
\r
938 else if( waitable == false )
\r
940 if( pOperation->u.operation.notify.callback.function == NULL )
\r
942 pOperation->u.operation.status = IOT_MQTT_SUCCESS;
\r
960 /* Check if this operation requires further processing. */
\r
961 if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
\r
963 /* Check if this operation should be scheduled for retransmission. */
\r
964 if( pOperation->u.operation.periodic.retry.limit > 0 )
\r
966 if( _scheduleNextRetry( pOperation ) == false )
\r
968 pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;
\r
972 /* A successfully scheduled PUBLISH retry is awaiting a response
\r
973 * from the network. */
\r
974 networkPending = true;
\r
979 /* Decrement reference count to signal completion of send job. Check
\r
980 * if the operation should be destroyed. */
\r
981 if( waitable == true )
\r
983 destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );
\r
990 /* If the operation should not be destroyed, transfer it from the
\r
991 * pending processing to the pending response list. */
\r
992 if( destroyOperation == false )
\r
994 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
996 /* Operation must be linked. */
\r
997 IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );
\r
999 /* Transfer to pending response list. */
\r
1000 IotListDouble_Remove( &( pOperation->link ) );
\r
1001 IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
\r
1002 &( pOperation->link ) );
\r
1004 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1006 /* This operation is now awaiting a response from the network. */
\r
1007 networkPending = true;
\r
1011 EMPTY_ELSE_MARKER;
\r
1017 EMPTY_ELSE_MARKER;
\r
1020 /* Destroy the operation or notify of completion if necessary. */
\r
1021 if( destroyOperation == true )
\r
1023 _IotMqtt_DestroyOperation( pOperation );
\r
1027 /* Do not check the operation status if a network response is pending,
\r
1028 * since a network response could modify the status. */
\r
1029 if( networkPending == false )
\r
1031 /* Notify of operation completion if this job set a status. */
\r
1032 if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )
\r
1034 _IotMqtt_Notify( pOperation );
\r
1038 EMPTY_ELSE_MARKER;
\r
1043 EMPTY_ELSE_MARKER;
\r
1048 /*-----------------------------------------------------------*/
\r
1050 void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
\r
1051 IotTaskPoolJob_t pOperationJob,
\r
1054 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
\r
1055 IotMqttCallbackParam_t callbackParam = { 0 };
\r
1057 /* Check parameters. The task pool and job parameter is not used when asserts
\r
1058 * are disabled. */
\r
1059 ( void ) pTaskPool;
\r
1060 ( void ) pOperationJob;
\r
1061 /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
\r
1062 IotMqtt_Assert( pOperationJob == pOperation->job );
\r
1064 /* The operation's callback function and status must be set. */
\r
1065 IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );
\r
1066 IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );
\r
1068 callbackParam.mqttConnection = pOperation->pMqttConnection;
\r
1069 callbackParam.u.operation.type = pOperation->u.operation.type;
\r
1070 callbackParam.u.operation.reference = pOperation;
\r
1071 callbackParam.u.operation.result = pOperation->u.operation.status;
\r
1073 /* Invoke the user callback function. */
\r
1074 pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,
\r
1077 /* Attempt to destroy the operation once the user callback returns. */
\r
1078 if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
\r
1080 _IotMqtt_DestroyOperation( pOperation );
\r
1084 EMPTY_ELSE_MARKER;
\r
1088 /*-----------------------------------------------------------*/
\r
1090 IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,
\r
1091 IotTaskPoolRoutine_t jobRoutine,
\r
1094 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
1095 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
1097 /* Check that job routine is valid. */
\r
1098 IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||
\r
1099 ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||
\r
1100 ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );
\r
1102 /* Creating a new job should never fail when parameters are valid. */
\r
1103 taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,
\r
1105 &( pOperation->jobStorage ),
\r
1106 &( pOperation->job ) );
\r
1107 IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
\r
1109 /* Schedule the new job with a delay. */
\r
1110 taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
\r
1114 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1116 /* Scheduling a newly-created job should never be invalid or illegal. */
\r
1117 IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );
\r
1118 IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );
\r
1120 IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",
\r
1121 pOperation->pMqttConnection,
\r
1122 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1124 IotTaskPool_strerror( taskPoolStatus ) );
\r
1126 status = IOT_MQTT_SCHEDULING_ERROR;
\r
1130 EMPTY_ELSE_MARKER;
\r
1136 /*-----------------------------------------------------------*/
\r
1138 _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
\r
1139 IotMqttOperationType_t type,
\r
1140 const uint16_t * pPacketIdentifier )
\r
1142 bool waitable = false;
\r
1143 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
1144 _mqttOperation_t * pResult = NULL;
\r
1145 IotLink_t * pResultLink = NULL;
\r
1146 _operationMatchParam_t param = { 0 };
\r
1148 param.type = type;
\r
1149 param.pPacketIdentifier = pPacketIdentifier;
\r
1151 if( pPacketIdentifier != NULL )
\r
1153 IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "
\r
1154 "with packet identifier %hu.",
\r
1156 IotMqtt_OperationType( type ),
\r
1157 *pPacketIdentifier );
\r
1161 IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",
\r
1163 IotMqtt_OperationType( type ) );
\r
1166 /* Find and remove the first matching element in the list. */
\r
1167 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1168 pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),
\r
1170 _mqttOperation_match,
\r
1173 /* Check if a match was found. */
\r
1174 if( pResultLink != NULL )
\r
1176 /* Get operation pointer and check if it is waitable. */
\r
1177 pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );
\r
1178 waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
\r
1180 /* Check if the matched operation is a PUBLISH with retry. If it is, cancel
\r
1181 * the retry job. */
\r
1182 if( pResult->u.operation.periodic.retry.limit > 0 )
\r
1184 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
1188 /* If the retry job could not be canceled, then it is currently
\r
1189 * executing. Ignore the operation. */
\r
1190 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1196 /* Check job reference counts. A waitable operation should have a
\r
1197 * count of 2; a non-waitable operation should have a count of 1. */
\r
1198 IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );
\r
1203 /* An operation with no retry in the pending responses list should
\r
1204 * always have a job reference of 1. */
\r
1205 IotMqtt_Assert( pResult->u.operation.jobReference == 1 );
\r
1207 /* Increment job references of a waitable operation to prevent Wait from
\r
1208 * destroying this operation if it times out. */
\r
1209 if( waitable == true )
\r
1211 ( pResult->u.operation.jobReference )++;
\r
1213 IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",
\r
1215 IotMqtt_OperationType( type ),
\r
1217 ( long int ) ( pResult->u.operation.jobReference - 1 ),
\r
1218 ( long int ) ( pResult->u.operation.jobReference ) );
\r
1224 EMPTY_ELSE_MARKER;
\r
1227 if( pResult != NULL )
\r
1229 IotLogDebug( "(MQTT connection %p) Found operation %s.",
\r
1231 IotMqtt_OperationType( type ) );
\r
1233 /* Remove the matched operation from the list. */
\r
1234 IotListDouble_Remove( &( pResult->link ) );
\r
1238 IotLogDebug( "(MQTT connection %p) Operation %s not found.",
\r
1240 IotMqtt_OperationType( type ) );
\r
1243 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1248 /*-----------------------------------------------------------*/
\r
1250 void _IotMqtt_Notify( _mqttOperation_t * pOperation )
\r
1252 IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;
\r
1253 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
1255 /* Check if operation is waitable. */
\r
1256 bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
\r
1258 /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected
\r
1259 * subscriptions are removed by the deserializer, so not removed here. */
\r
1260 if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )
\r
1262 switch( pOperation->u.operation.status )
\r
1264 case IOT_MQTT_SUCCESS:
\r
1267 case IOT_MQTT_SERVER_REFUSED:
\r
1271 _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,
\r
1272 pOperation->u.operation.packetIdentifier,
\r
1279 EMPTY_ELSE_MARKER;
\r
1282 /* Schedule callback invocation for non-waitable operation. */
\r
1283 if( waitable == false )
\r
1285 /* Non-waitable operation should have job reference of 1. */
\r
1286 IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );
\r
1288 /* Schedule an invocation of the callback. */
\r
1289 if( pOperation->u.operation.notify.callback.function != NULL )
\r
1291 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1293 status = _IotMqtt_ScheduleOperation( pOperation,
\r
1294 _IotMqtt_ProcessCompletedOperation,
\r
1297 if( status == IOT_MQTT_SUCCESS )
\r
1299 IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",
\r
1300 pOperation->pMqttConnection,
\r
1301 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1304 /* Place the scheduled operation back in the list of operations pending
\r
1306 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
1308 IotListDouble_Remove( &( pOperation->link ) );
\r
1312 EMPTY_ELSE_MARKER;
\r
1315 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
1316 &( pOperation->link ) );
\r
1320 IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",
\r
1321 pOperation->pMqttConnection,
\r
1322 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1326 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1330 EMPTY_ELSE_MARKER;
\r
1335 EMPTY_ELSE_MARKER;
\r
1338 /* Operations that weren't scheduled may be destroyed. */
\r
1339 if( status == IOT_MQTT_SCHEDULING_ERROR )
\r
1341 /* Decrement reference count of operations not scheduled. */
\r
1342 if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
\r
1344 _IotMqtt_DestroyOperation( pOperation );
\r
1348 EMPTY_ELSE_MARKER;
\r
1351 /* Post to a waitable operation's semaphore. */
\r
1352 if( waitable == true )
\r
1354 IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "
\r
1355 "notified of completion.",
\r
1356 pOperation->pMqttConnection,
\r
1357 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1360 IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );
\r
1364 EMPTY_ELSE_MARKER;
\r
1369 IotMqtt_Assert( status == IOT_MQTT_SUCCESS );
\r
1373 /*-----------------------------------------------------------*/
\r