2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_operation.c
\r
28 * @brief Implements functions that process MQTT operations.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
37 /* Error handling include. */
\r
38 #include "private/iot_error.h"
\r
40 /* MQTT internal include. */
\r
41 #include "private/iot_mqtt_internal.h"
\r
43 /* Platform layer includes. */
\r
44 #include "platform/iot_clock.h"
\r
45 #include "platform/iot_threads.h"
\r
47 /*-----------------------------------------------------------*/
\r
50 * @brief First parameter to #_mqttOperation_match.
\r
52 typedef struct _operationMatchParam
\r
54 IotMqttOperationType_t type; /**< @brief The type of operation to look for. */
\r
55 const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.
\r
56 * Set to `NULL` to ignore packet identifier. */
\r
57 } _operationMatchParam_t;
\r
59 /*-----------------------------------------------------------*/
\r
62 * @brief Match an MQTT operation by type and packet identifier.
\r
64 * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.
\r
65 * @param[in] pMatch Pointer to an #_operationMatchParam_t.
\r
67 * @return `true` if the operation matches the parameters in `pArgument`; `false`
\r
70 static bool _mqttOperation_match( const IotLink_t * pOperationLink,
\r
74 * @brief Check if an operation with retry has exceeded its retry limit.
\r
76 * If a PUBLISH operation is available for retry, this function also sets any
\r
77 * necessary DUP flags.
\r
79 * @param[in] pOperation The operation to check.
\r
81 * @return `true` if the operation may be retried; `false` otherwise.
\r
83 static bool _checkRetryLimit( _mqttOperation_t * pOperation );
\r
86 * @brief Schedule the next send of an operation with retry.
\r
88 * @param[in] pOperation The operation to schedule.
\r
90 * @return `true` if the reschedule succeeded; `false` otherwise.
\r
92 static bool _scheduleNextRetry( _mqttOperation_t * pOperation );
\r
94 /*-----------------------------------------------------------*/
\r
96 static bool _mqttOperation_match( const IotLink_t * pOperationLink,
\r
101 /* Because this function is called from a container function, the given link
\r
102 * must never be NULL. */
\r
103 IotMqtt_Assert( pOperationLink != NULL );
\r
105 _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,
\r
108 _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;
\r
110 /* Check for matching operations. */
\r
111 if( pParam->type == pOperation->u.operation.type )
\r
113 /* Check for matching packet identifiers. */
\r
114 if( pParam->pPacketIdentifier == NULL )
\r
120 match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );
\r
131 /*-----------------------------------------------------------*/
\r
133 static bool _checkRetryLimit( _mqttOperation_t * pOperation )
\r
135 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
136 bool status = true;
\r
138 /* Choose a set DUP function. */
\r
139 void ( * publishSetDup )( uint8_t *,
\r
141 uint16_t * ) = _IotMqtt_PublishSetDup;
\r
143 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
144 if( pMqttConnection->pSerializer != NULL )
\r
146 if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )
\r
148 publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;
\r
159 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
161 /* Only PUBLISH may be retried. */
\r
162 IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );
\r
164 /* Check if the retry limit is exhausted. */
\r
165 if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )
\r
167 /* The retry count may be at most one more than the retry limit, which
\r
168 * accounts for the final check for a PUBACK. */
\r
169 IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );
\r
171 IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",
\r
174 pOperation->u.operation.retry.limit );
\r
178 /* Check if this is the first retry. */
\r
179 else if( pOperation->u.operation.retry.count == 1 )
\r
181 /* Always set the DUP flag on the first retry. */
\r
182 publishSetDup( pOperation->u.operation.pMqttPacket,
\r
183 pOperation->u.operation.pPacketIdentifierHigh,
\r
184 &( pOperation->u.operation.packetIdentifier ) );
\r
188 /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
\r
189 * identifier) must be reset on every retry. */
\r
190 if( pMqttConnection->awsIotMqttMode == true )
\r
192 publishSetDup( pOperation->u.operation.pMqttPacket,
\r
193 pOperation->u.operation.pPacketIdentifierHigh,
\r
194 &( pOperation->u.operation.packetIdentifier ) );
\r
205 /*-----------------------------------------------------------*/
\r
207 static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
\r
209 bool firstRetry = false;
\r
210 uint32_t scheduleDelay = 0;
\r
211 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
212 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
214 /* This function should never be called with retry count greater than
\r
216 IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );
\r
218 /* Increment the retry count. */
\r
219 ( pOperation->u.operation.retry.count )++;
\r
221 /* Check for a response shortly for the final retry. Otherwise, calculate the
\r
222 * next retry period. */
\r
223 if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )
\r
225 scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;
\r
227 IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "
\r
228 "for response in %d ms.",
\r
231 IOT_MQTT_RESPONSE_WAIT_MS );
\r
235 scheduleDelay = pOperation->u.operation.retry.nextPeriod;
\r
237 /* Double the retry period, subject to a ceiling value. */
\r
238 pOperation->u.operation.retry.nextPeriod *= 2;
\r
240 if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )
\r
242 pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;
\r
249 IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",
\r
252 ( unsigned long ) pOperation->u.operation.retry.count,
\r
253 ( unsigned long ) pOperation->u.operation.retry.limit,
\r
254 ( unsigned long ) scheduleDelay );
\r
256 /* Check if this is the first retry. */
\r
257 firstRetry = ( pOperation->u.operation.retry.count == 1 );
\r
259 /* On the first retry, the PUBLISH will be moved from the pending processing
\r
260 * list to the pending responses list. Lock the connection references mutex
\r
261 * to manipulate the lists. */
\r
262 if( firstRetry == true )
\r
264 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
272 /* Reschedule the PUBLISH for another send. */
\r
273 status = _IotMqtt_ScheduleOperation( pOperation,
\r
274 _IotMqtt_ProcessSend,
\r
277 /* Check for successful reschedule. */
\r
278 if( status == IOT_MQTT_SUCCESS )
\r
280 /* Move a successfully rescheduled PUBLISH from the pending processing
\r
281 * list to the pending responses list on the first retry. */
\r
282 if( firstRetry == true )
\r
284 /* Operation must be linked. */
\r
285 IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );
\r
287 /* Transfer to pending response list. */
\r
288 IotListDouble_Remove( &( pOperation->link ) );
\r
289 IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
\r
290 &( pOperation->link ) );
\r
302 /* The references mutex only needs to be unlocked on the first retry, since
\r
303 * only the first retry manipulates the connection lists. */
\r
304 if( firstRetry == true )
\r
306 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
313 return( status == IOT_MQTT_SUCCESS );
\r
316 /*-----------------------------------------------------------*/
\r
318 IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
\r
320 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
321 _mqttOperation_t ** pNewOperation )
\r
323 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
324 bool decrementOnError = false;
\r
325 _mqttOperation_t * pOperation = NULL;
\r
326 bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );
\r
328 /* If the waitable flag is set, make sure that there's no callback. */
\r
329 if( waitable == true )
\r
331 if( pCallbackInfo != NULL )
\r
333 IotLogError( "Callback should not be set for a waitable operation." );
\r
335 return IOT_MQTT_BAD_PARAMETER;
\r
347 IotLogDebug( "(MQTT connection %p) Creating new operation record.",
\r
350 /* Increment the reference count for the MQTT connection when creating a new
\r
352 if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )
\r
354 IotLogError( "(MQTT connection %p) New operation record cannot be created"
\r
355 " for a closed connection",
\r
358 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
\r
362 /* Reference count will need to be decremented on error. */
\r
363 decrementOnError = true;
\r
366 /* Allocate memory for a new operation. */
\r
367 pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
\r
369 if( pOperation == NULL )
\r
371 IotLogError( "(MQTT connection %p) Failed to allocate memory for new "
\r
372 "operation record.",
\r
375 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
379 /* Clear the operation data. */
\r
380 ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
\r
382 /* Initialize some members of the new operation. */
\r
383 pOperation->pMqttConnection = pMqttConnection;
\r
384 pOperation->u.operation.jobReference = 1;
\r
385 pOperation->u.operation.flags = flags;
\r
386 pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;
\r
389 /* Check if the waitable flag is set. If it is, create a semaphore to
\r
391 if( waitable == true )
\r
393 /* Create a semaphore to wait on for a waitable operation. */
\r
394 if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )
\r
396 IotLogError( "(MQTT connection %p) Failed to create semaphore for "
\r
397 "waitable operation.",
\r
400 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
404 /* A waitable operation is created with an additional reference for the
\r
405 * Wait function. */
\r
406 ( pOperation->u.operation.jobReference )++;
\r
411 /* If the waitable flag isn't set but a callback is, copy the callback
\r
413 if( pCallbackInfo != NULL )
\r
415 pOperation->u.operation.notify.callback = *pCallbackInfo;
\r
423 /* Add this operation to the MQTT connection's operation list. */
\r
424 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
425 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
426 &( pOperation->link ) );
\r
427 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
429 /* Set the output parameter. */
\r
430 *pNewOperation = pOperation;
\r
432 /* Clean up operation and decrement reference count if this function failed. */
\r
433 IOT_FUNCTION_CLEANUP_BEGIN();
\r
435 if( status != IOT_MQTT_SUCCESS )
\r
437 if( decrementOnError == true )
\r
439 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
446 if( pOperation != NULL )
\r
448 IotMqtt_FreeOperation( pOperation );
\r
460 IOT_FUNCTION_CLEANUP_END();
\r
463 /*-----------------------------------------------------------*/
\r
465 bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
\r
468 bool destroyOperation = false;
\r
469 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
470 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
472 /* Attempt to cancel the operation's job. */
\r
473 if( cancelJob == true )
\r
475 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
479 /* If the operation's job was not canceled, it must be already executing.
\r
480 * Any other return value is invalid. */
\r
481 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
482 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
484 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
486 IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",
\r
488 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
501 /* Decrement job reference count. */
\r
502 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
504 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
505 pOperation->u.operation.jobReference--;
\r
507 IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"
\r
508 " from %ld to %ld.",
\r
510 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
512 pOperation->u.operation.jobReference + 1,
\r
513 pOperation->u.operation.jobReference );
\r
515 /* The job reference count must be 0 or 1 after the decrement. */
\r
516 IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||
\r
517 ( pOperation->u.operation.jobReference == 1 ) );
\r
519 /* This operation may be destroyed if its reference count is 0. */
\r
520 if( pOperation->u.operation.jobReference == 0 )
\r
522 destroyOperation = true;
\r
529 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
536 return destroyOperation;
\r
539 /*-----------------------------------------------------------*/
\r
541 void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )
\r
543 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
545 /* Default free packet function. */
\r
546 void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
\r
548 IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",
\r
550 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
553 /* The job reference count must be between 0 and 2. */
\r
554 IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&
\r
555 ( pOperation->u.operation.jobReference <= 2 ) );
\r
557 /* Jobs to be destroyed should be removed from the MQTT connection's
\r
559 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
561 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
563 IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",
\r
565 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
569 IotListDouble_Remove( &( pOperation->link ) );
\r
573 IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",
\r
575 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
579 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
581 /* Free any allocated MQTT packet. */
\r
582 if( pOperation->u.operation.pMqttPacket != NULL )
\r
584 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
585 if( pMqttConnection->pSerializer != NULL )
\r
587 if( pMqttConnection->pSerializer->freePacket != NULL )
\r
589 freePacket = pMqttConnection->pSerializer->freePacket;
\r
600 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
602 freePacket( pOperation->u.operation.pMqttPacket );
\r
604 IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",
\r
606 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
611 IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",
\r
613 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
617 /* Check if a wait semaphore was created for this operation. */
\r
618 if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
\r
620 IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );
\r
622 IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",
\r
624 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
632 IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",
\r
634 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
637 /* Free the memory used to hold operation data. */
\r
638 IotMqtt_FreeOperation( pOperation );
\r
640 /* Decrement the MQTT connection's reference count after destroying an
\r
642 _IotMqtt_DecrementConnectionReferences( pMqttConnection );
\r
645 /*-----------------------------------------------------------*/
\r
647 void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
\r
648 IotTaskPoolJob_t pKeepAliveJob,
\r
651 bool status = true;
\r
652 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
653 size_t bytesSent = 0;
\r
655 /* Retrieve the MQTT connection from the context. */
\r
656 _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;
\r
658 /* Check parameters. */
\r
659 IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );
\r
660 IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );
\r
662 /* Check that keep-alive interval is valid. The MQTT spec states its maximum
\r
663 * value is 65,535 seconds. */
\r
664 IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );
\r
666 /* Only two values are valid for the next keep alive job delay. */
\r
667 IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||
\r
668 ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );
\r
670 IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );
\r
672 /* Re-create the keep-alive job for rescheduling. This should never fail. */
\r
673 taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
\r
675 IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),
\r
677 IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
\r
679 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
681 /* Determine whether to send a PINGREQ or check for PINGRESP. */
\r
682 if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )
\r
684 IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );
\r
686 /* Because PINGREQ may be used to keep the MQTT connection alive, it is
\r
687 * more important than other operations. Bypass the queue of jobs for
\r
688 * operations by directly sending the PINGREQ in this job. */
\r
689 bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
\r
690 pMqttConnection->pPingreqPacket,
\r
691 pMqttConnection->pingreqPacketSize );
\r
693 if( bytesSent != pMqttConnection->pingreqPacketSize )
\r
695 IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );
\r
700 /* Assume the keep-alive will fail. The network receive callback will
\r
701 * clear the failure flag upon receiving a PINGRESP. */
\r
702 pMqttConnection->keepAliveFailure = true;
\r
704 /* Schedule a check for PINGRESP. */
\r
705 pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;
\r
707 IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",
\r
709 IOT_MQTT_RESPONSE_WAIT_MS );
\r
714 IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );
\r
716 if( pMqttConnection->keepAliveFailure == false )
\r
718 IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );
\r
720 /* PINGRESP was received. Schedule the next PINGREQ transmission. */
\r
721 pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
\r
725 IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",
\r
727 IOT_MQTT_RESPONSE_WAIT_MS );
\r
729 /* The network receive callback did not clear the failure flag. */
\r
734 /* When a PINGREQ is successfully sent, reschedule this job to check for a
\r
735 * response shortly. */
\r
736 if( status == true )
\r
738 taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,
\r
740 pMqttConnection->nextKeepAliveMs );
\r
742 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
744 IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",
\r
746 IOT_MQTT_RESPONSE_WAIT_MS );
\r
750 IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",
\r
752 IotTaskPool_strerror( taskPoolStatus ) );
\r
762 /* Close the connection on failures. */
\r
763 if( status == false )
\r
765 _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,
\r
773 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
776 /*-----------------------------------------------------------*/
\r
778 void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
\r
779 IotTaskPoolJob_t pPublishJob,
\r
782 _mqttOperation_t * pOperation = pContext;
\r
783 IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };
\r
785 /* Check parameters. The task pool and job parameter is not used when asserts
\r
787 ( void ) pTaskPool;
\r
788 ( void ) pPublishJob;
\r
789 IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );
\r
790 IotMqtt_Assert( pOperation->incomingPublish == true );
\r
791 IotMqtt_Assert( pPublishJob == pOperation->job );
\r
793 /* Remove the operation from the pending processing list. */
\r
794 IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );
\r
796 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
798 IotListDouble_Remove( &( pOperation->link ) );
\r
805 IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );
\r
807 /* Process the current PUBLISH. */
\r
808 callbackParam.u.message.info = pOperation->u.publish.publishInfo;
\r
810 _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,
\r
813 /* Free any buffers associated with the current PUBLISH message. */
\r
814 if( pOperation->u.publish.pReceivedData != NULL )
\r
816 IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
\r
823 /* Free the incoming PUBLISH operation. */
\r
824 IotMqtt_FreeOperation( pOperation );
\r
827 /*-----------------------------------------------------------*/
\r
829 void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
\r
830 IotTaskPoolJob_t pSendJob,
\r
833 size_t bytesSent = 0;
\r
834 bool destroyOperation = false, waitable = false, networkPending = false;
\r
835 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
\r
836 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
838 /* Check parameters. The task pool and job parameter is not used when asserts
\r
840 ( void ) pTaskPool;
\r
842 IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );
\r
843 IotMqtt_Assert( pSendJob == pOperation->job );
\r
845 /* The given operation must have an allocated packet and be waiting for a status. */
\r
846 IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
\r
847 IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );
\r
848 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
\r
850 /* Check if this operation is waitable. */
\r
851 waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
\r
853 /* Check PUBLISH retry counts and limits. */
\r
854 if( pOperation->u.operation.retry.limit > 0 )
\r
856 if( _checkRetryLimit( pOperation ) == false )
\r
858 pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;
\r
870 /* Send an operation that is waiting for a response. */
\r
871 if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
\r
873 IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",
\r
875 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
878 /* Transmit the MQTT packet from the operation over the network. */
\r
879 bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
\r
880 pOperation->u.operation.pMqttPacket,
\r
881 pOperation->u.operation.packetSize );
\r
883 /* Check transmission status. */
\r
884 if( bytesSent != pOperation->u.operation.packetSize )
\r
886 pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;
\r
890 /* DISCONNECT operations are considered successful upon successful
\r
891 * transmission. In addition, non-waitable operations with no callback
\r
892 * may also be considered successful. */
\r
893 if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )
\r
895 /* DISCONNECT operations are always waitable. */
\r
896 IotMqtt_Assert( waitable == true );
\r
898 pOperation->u.operation.status = IOT_MQTT_SUCCESS;
\r
900 else if( waitable == false )
\r
902 if( pOperation->u.operation.notify.callback.function == NULL )
\r
904 pOperation->u.operation.status = IOT_MQTT_SUCCESS;
\r
922 /* Check if this operation requires further processing. */
\r
923 if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
\r
925 /* Check if this operation should be scheduled for retransmission. */
\r
926 if( pOperation->u.operation.retry.limit > 0 )
\r
928 if( _scheduleNextRetry( pOperation ) == false )
\r
930 pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;
\r
934 /* A successfully scheduled PUBLISH retry is awaiting a response
\r
935 * from the network. */
\r
936 networkPending = true;
\r
941 /* Decrement reference count to signal completion of send job. Check
\r
942 * if the operation should be destroyed. */
\r
943 if( waitable == true )
\r
945 destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );
\r
952 /* If the operation should not be destroyed, transfer it from the
\r
953 * pending processing to the pending response list. */
\r
954 if( destroyOperation == false )
\r
956 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
958 /* Operation must be linked. */
\r
959 IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );
\r
961 /* Transfer to pending response list. */
\r
962 IotListDouble_Remove( &( pOperation->link ) );
\r
963 IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
\r
964 &( pOperation->link ) );
\r
966 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
968 /* This operation is now awaiting a response from the network. */
\r
969 networkPending = true;
\r
982 /* Destroy the operation or notify of completion if necessary. */
\r
983 if( destroyOperation == true )
\r
985 _IotMqtt_DestroyOperation( pOperation );
\r
989 /* Do not check the operation status if a network response is pending,
\r
990 * since a network response could modify the status. */
\r
991 if( networkPending == false )
\r
993 /* Notify of operation completion if this job set a status. */
\r
994 if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )
\r
996 _IotMqtt_Notify( pOperation );
\r
1000 EMPTY_ELSE_MARKER;
\r
1005 EMPTY_ELSE_MARKER;
\r
1010 /*-----------------------------------------------------------*/
\r
1012 void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
\r
1013 IotTaskPoolJob_t pOperationJob,
\r
1016 _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
\r
1017 IotMqttCallbackParam_t callbackParam = { 0 };
\r
1019 /* Check parameters. The task pool and job parameter is not used when asserts
\r
1020 * are disabled. */
\r
1021 ( void ) pTaskPool;
\r
1022 ( void ) pOperationJob;
\r
1023 IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL );
\r
1024 IotMqtt_Assert( pOperationJob == pOperation->job );
\r
1026 /* The operation's callback function and status must be set. */
\r
1027 IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );
\r
1028 IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );
\r
1030 callbackParam.mqttConnection = pOperation->pMqttConnection;
\r
1031 callbackParam.u.operation.type = pOperation->u.operation.type;
\r
1032 callbackParam.u.operation.reference = pOperation;
\r
1033 callbackParam.u.operation.result = pOperation->u.operation.status;
\r
1035 /* Invoke the user callback function. */
\r
1036 pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,
\r
1039 /* Attempt to destroy the operation once the user callback returns. */
\r
1040 if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
\r
1042 _IotMqtt_DestroyOperation( pOperation );
\r
1046 EMPTY_ELSE_MARKER;
\r
1050 /*-----------------------------------------------------------*/
\r
1052 IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,
\r
1053 IotTaskPoolRoutine_t jobRoutine,
\r
1056 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
1057 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
1059 /* Check that job routine is valid. */
\r
1060 IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||
\r
1061 ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||
\r
1062 ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );
\r
1064 /* Creating a new job should never fail when parameters are valid. */
\r
1065 taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,
\r
1067 &( pOperation->jobStorage ),
\r
1068 &( pOperation->job ) );
\r
1069 IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
\r
1071 /* Schedule the new job with a delay. */
\r
1072 taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
\r
1076 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1078 /* Scheduling a newly-created job should never be invalid or illegal. */
\r
1079 IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );
\r
1080 IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );
\r
1082 IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",
\r
1083 pOperation->pMqttConnection,
\r
1084 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1086 IotTaskPool_strerror( taskPoolStatus ) );
\r
1088 status = IOT_MQTT_SCHEDULING_ERROR;
\r
1092 EMPTY_ELSE_MARKER;
\r
1098 /*-----------------------------------------------------------*/
\r
1100 _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
\r
1101 IotMqttOperationType_t type,
\r
1102 const uint16_t * pPacketIdentifier )
\r
1104 bool waitable = false;
\r
1105 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
1106 _mqttOperation_t * pResult = NULL;
\r
1107 IotLink_t * pResultLink = NULL;
\r
1108 _operationMatchParam_t param = { .type = type, .pPacketIdentifier = pPacketIdentifier };
\r
1110 if( pPacketIdentifier != NULL )
\r
1112 IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "
\r
1113 "with packet identifier %hu.",
\r
1115 IotMqtt_OperationType( type ),
\r
1116 *pPacketIdentifier );
\r
1120 IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",
\r
1122 IotMqtt_OperationType( type ) );
\r
1125 /* Find and remove the first matching element in the list. */
\r
1126 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1127 pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),
\r
1129 _mqttOperation_match,
\r
1132 /* Check if a match was found. */
\r
1133 if( pResultLink != NULL )
\r
1135 /* Get operation pointer and check if it is waitable. */
\r
1136 pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );
\r
1137 waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
\r
1139 /* Check if the matched operation is a PUBLISH with retry. If it is, cancel
\r
1140 * the retry job. */
\r
1141 if( pResult->u.operation.retry.limit > 0 )
\r
1143 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
1147 /* If the retry job could not be canceled, then it is currently
\r
1148 * executing. Ignore the operation. */
\r
1149 if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
\r
1155 /* Check job reference counts. A waitable operation should have a
\r
1156 * count of 2; a non-waitable operation should have a count of 1. */
\r
1157 IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );
\r
1162 /* An operation with no retry in the pending responses list should
\r
1163 * always have a job reference of 1. */
\r
1164 IotMqtt_Assert( pResult->u.operation.jobReference == 1 );
\r
1166 /* Increment job references of a waitable operation to prevent Wait from
\r
1167 * destroying this operation if it times out. */
\r
1168 if( waitable == true )
\r
1170 ( pResult->u.operation.jobReference )++;
\r
1172 IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",
\r
1174 IotMqtt_OperationType( type ),
\r
1176 ( long int ) ( pResult->u.operation.jobReference - 1 ),
\r
1177 ( long int ) ( pResult->u.operation.jobReference ) );
\r
1183 EMPTY_ELSE_MARKER;
\r
1186 if( pResult != NULL )
\r
1188 IotLogDebug( "(MQTT connection %p) Found operation %s." ,
\r
1190 IotMqtt_OperationType( type ) );
\r
1192 /* Remove the matched operation from the list. */
\r
1193 IotListDouble_Remove( &( pResult->link ) );
\r
1197 IotLogDebug( "(MQTT connection %p) Operation %s not found.",
\r
1199 IotMqtt_OperationType( type ) );
\r
1202 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1207 /*-----------------------------------------------------------*/
\r
1209 void _IotMqtt_Notify( _mqttOperation_t * pOperation )
\r
1211 IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;
\r
1212 _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
\r
1214 /* Check if operation is waitable. */
\r
1215 bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
\r
1217 /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected
\r
1218 * subscriptions are removed by the deserializer, so not removed here. */
\r
1219 if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )
\r
1221 switch( pOperation->u.operation.status )
\r
1223 case IOT_MQTT_SUCCESS:
\r
1226 case IOT_MQTT_SERVER_REFUSED:
\r
1230 _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,
\r
1231 pOperation->u.operation.packetIdentifier,
\r
1238 EMPTY_ELSE_MARKER;
\r
1241 /* Schedule callback invocation for non-waitable operation. */
\r
1242 if( waitable == false )
\r
1244 /* Non-waitable operation should have job reference of 1. */
\r
1245 IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );
\r
1247 /* Schedule an invocation of the callback. */
\r
1248 if( pOperation->u.operation.notify.callback.function != NULL )
\r
1250 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
1252 status = _IotMqtt_ScheduleOperation( pOperation,
\r
1253 _IotMqtt_ProcessCompletedOperation,
\r
1256 if( status == IOT_MQTT_SUCCESS )
\r
1258 IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",
\r
1259 pOperation->pMqttConnection,
\r
1260 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1263 /* Place the scheduled operation back in the list of operations pending
\r
1265 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
1267 IotListDouble_Remove( &( pOperation->link ) );
\r
1271 EMPTY_ELSE_MARKER;
\r
1274 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
1275 &( pOperation->link ) );
\r
1279 IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",
\r
1280 pOperation->pMqttConnection,
\r
1281 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1285 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
1289 EMPTY_ELSE_MARKER;
\r
1294 EMPTY_ELSE_MARKER;
\r
1297 /* Operations that weren't scheduled may be destroyed. */
\r
1298 if( status == IOT_MQTT_SCHEDULING_ERROR )
\r
1300 /* Decrement reference count of operations not scheduled. */
\r
1301 if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
\r
1303 _IotMqtt_DestroyOperation( pOperation );
\r
1307 EMPTY_ELSE_MARKER;
\r
1310 /* Post to a waitable operation's semaphore. */
\r
1311 if( waitable == true )
\r
1313 IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "
\r
1314 "notified of completion.",
\r
1315 pOperation->pMqttConnection,
\r
1316 IotMqtt_OperationType( pOperation->u.operation.type ),
\r
1319 IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );
\r
1323 EMPTY_ELSE_MARKER;
\r
1328 IotMqtt_Assert( status == IOT_MQTT_SUCCESS );
\r
1332 /*-----------------------------------------------------------*/
\r