2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_network.c
\r
28 * @brief Implements functions involving transport layer connections.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
37 /* Error handling include. */
\r
38 #include "private/iot_error.h"
\r
40 /* MQTT internal include. */
\r
41 #include "private/iot_mqtt_internal.h"
\r
43 /* Platform layer includes. */
\r
44 #include "platform/iot_threads.h"
\r
46 /*-----------------------------------------------------------*/
\r
49 * @brief Check if an incoming packet type is valid.
\r
51 * @param[in] packetType The packet type to check.
\r
53 * @return `true` if the packet type is valid; `false` otherwise.
\r
55 static bool _incomingPacketValid( uint8_t packetType );
\r
58 * @brief Get an incoming MQTT packet from the network.
\r
60 * @param[in] pNetworkConnection Network connection to use for receive, which
\r
61 * may be different from the network connection associated with the MQTT connection.
\r
62 * @param[in] pMqttConnection The associated MQTT connection.
\r
63 * @param[out] pIncomingPacket Output parameter for the incoming packet.
\r
65 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE.
\r
67 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
\r
68 const _mqttConnection_t * pMqttConnection,
\r
69 _mqttPacket_t * pIncomingPacket );
\r
72 * @brief Deserialize a packet received from the network.
\r
74 * @param[in] pMqttConnection The associated MQTT connection.
\r
75 * @param[in] pIncomingPacket The packet received from the network.
\r
77 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR,
\r
78 * #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED.
\r
80 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
\r
81 _mqttPacket_t * pIncomingPacket );
\r
84 * @brief Send a PUBACK for a received QoS 1 PUBLISH packet.
\r
86 * @param[in] pMqttConnection Which connection the PUBACK should be sent over.
\r
87 * @param[in] packetIdentifier Which packet identifier to include in PUBACK.
\r
89 static void _sendPuback( _mqttConnection_t * pMqttConnection,
\r
90 uint16_t packetIdentifier );
\r
92 /*-----------------------------------------------------------*/
\r
94 static bool _incomingPacketValid( uint8_t packetType )
\r
98 /* Check packet type. Mask out lower bits to ignore flags. */
\r
99 switch( packetType & 0xf0 )
\r
101 /* Valid incoming packet types. */
\r
102 case MQTT_PACKET_TYPE_CONNACK:
\r
103 case MQTT_PACKET_TYPE_PUBLISH:
\r
104 case MQTT_PACKET_TYPE_PUBACK:
\r
105 case MQTT_PACKET_TYPE_SUBACK:
\r
106 case MQTT_PACKET_TYPE_UNSUBACK:
\r
107 case MQTT_PACKET_TYPE_PINGRESP:
\r
110 /* Any other packet type is invalid. */
\r
119 /*-----------------------------------------------------------*/
\r
121 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
\r
122 const _mqttConnection_t * pMqttConnection,
\r
123 _mqttPacket_t * pIncomingPacket )
\r
125 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
126 size_t dataBytesRead = 0;
\r
128 /* Default functions for retrieving packet type and length. */
\r
129 uint8_t ( * getPacketType )( void *,
\r
130 const IotNetworkInterface_t * ) = _IotMqtt_GetPacketType;
\r
131 size_t ( * getRemainingLength )( void *,
\r
132 const IotNetworkInterface_t * ) = _IotMqtt_GetRemainingLength;
\r
134 /* No buffer for remaining data should be allocated. */
\r
135 IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
\r
136 IotMqtt_Assert( pIncomingPacket->remainingLength == 0 );
\r
138 /* Choose packet type and length functions. */
\r
139 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
140 if( pMqttConnection->pSerializer != NULL )
\r
142 if( pMqttConnection->pSerializer->getPacketType != NULL )
\r
144 getPacketType = pMqttConnection->pSerializer->getPacketType;
\r
151 if( pMqttConnection->pSerializer->getRemainingLength != NULL )
\r
153 getRemainingLength = pMqttConnection->pSerializer->getRemainingLength;
\r
164 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
166 /* Read the packet type, which is the first byte available. */
\r
167 pIncomingPacket->type = getPacketType( pNetworkConnection,
\r
168 pMqttConnection->pNetworkInterface );
\r
170 /* Check that the incoming packet type is valid. */
\r
171 if( _incomingPacketValid( pIncomingPacket->type ) == false )
\r
173 IotLogError( "(MQTT connection %p) Unknown packet type %02x received.",
\r
175 pIncomingPacket->type );
\r
177 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
184 /* Read the remaining length. */
\r
185 pIncomingPacket->remainingLength = getRemainingLength( pNetworkConnection,
\r
186 pMqttConnection->pNetworkInterface );
\r
188 if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
\r
190 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
197 /* Allocate a buffer for the remaining data and read the data. */
\r
198 if( pIncomingPacket->remainingLength > 0 )
\r
200 pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength );
\r
202 if( pIncomingPacket->pRemainingData == NULL )
\r
204 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
211 dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection,
\r
212 pIncomingPacket->pRemainingData,
\r
213 pIncomingPacket->remainingLength );
\r
215 if( dataBytesRead != pIncomingPacket->remainingLength )
\r
217 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
229 /* Clean up on error. */
\r
230 IOT_FUNCTION_CLEANUP_BEGIN();
\r
232 if( status != IOT_MQTT_SUCCESS )
\r
234 if( pIncomingPacket->pRemainingData != NULL )
\r
236 IotMqtt_FreeMessage( pIncomingPacket->pRemainingData );
\r
248 IOT_FUNCTION_CLEANUP_END();
\r
251 /*-----------------------------------------------------------*/
\r
253 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
\r
254 _mqttPacket_t * pIncomingPacket )
\r
256 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
257 _mqttOperation_t * pOperation = NULL;
\r
259 /* Deserializer function. */
\r
260 IotMqttError_t ( * deserialize )( _mqttPacket_t * ) = NULL;
\r
262 /* A buffer for remaining data must be allocated if remaining length is not 0. */
\r
263 IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) ==
\r
264 ( pIncomingPacket->pRemainingData != NULL ) );
\r
266 /* Only valid packets should be given to this function. */
\r
267 IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true );
\r
269 /* Mask out the low bits of packet type to ignore flags. */
\r
270 switch( ( pIncomingPacket->type & 0xf0 ) )
\r
272 case MQTT_PACKET_TYPE_CONNACK:
\r
273 IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection );
\r
275 /* Choose CONNACK deserializer. */
\r
276 deserialize = _IotMqtt_DeserializeConnack;
\r
278 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
279 if( pMqttConnection->pSerializer != NULL )
\r
281 if( pMqttConnection->pSerializer->deserialize.connack != NULL )
\r
283 deserialize = pMqttConnection->pSerializer->deserialize.connack;
\r
294 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
296 /* Deserialize CONNACK and notify of result. */
\r
297 status = deserialize( pIncomingPacket );
\r
298 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
302 if( pOperation != NULL )
\r
304 pOperation->u.operation.status = status;
\r
305 _IotMqtt_Notify( pOperation );
\r
314 case MQTT_PACKET_TYPE_PUBLISH:
\r
315 IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection );
\r
317 /* Allocate memory to handle the incoming PUBLISH. */
\r
318 pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
\r
320 if( pOperation == NULL )
\r
322 IotLogWarn( "Failed to allocate memory for incoming PUBLISH." );
\r
323 status = IOT_MQTT_NO_MEMORY;
\r
329 /* Set the members of the incoming PUBLISH operation. */
\r
330 ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
\r
331 pOperation->incomingPublish = true;
\r
332 pOperation->pMqttConnection = pMqttConnection;
\r
333 pIncomingPacket->u.pIncomingPublish = pOperation;
\r
336 /* Choose a PUBLISH deserializer. */
\r
337 deserialize = _IotMqtt_DeserializePublish;
\r
339 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
340 if( pMqttConnection->pSerializer != NULL )
\r
342 if( pMqttConnection->pSerializer->deserialize.publish != NULL )
\r
344 deserialize = pMqttConnection->pSerializer->deserialize.publish;
\r
355 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
357 /* Deserialize incoming PUBLISH. */
\r
358 status = deserialize( pIncomingPacket );
\r
360 if( status == IOT_MQTT_SUCCESS )
\r
362 /* Send a PUBACK for QoS 1 PUBLISH. */
\r
363 if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 )
\r
365 _sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier );
\r
372 /* Transfer ownership of the received MQTT packet to the PUBLISH operation. */
\r
373 pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData;
\r
374 pIncomingPacket->pRemainingData = NULL;
\r
376 /* Add the PUBLISH to the list of operations pending processing. */
\r
377 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
378 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
379 &( pOperation->link ) );
\r
380 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
382 /* Increment the MQTT connection reference count before scheduling an
\r
383 * incoming PUBLISH. */
\r
384 if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true )
\r
386 /* Schedule PUBLISH for callback invocation. */
\r
387 status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 );
\r
391 status = IOT_MQTT_NETWORK_ERROR;
\r
399 /* Free PUBLISH operation on error. */
\r
400 if( status != IOT_MQTT_SUCCESS )
\r
402 /* Check ownership of the received MQTT packet. */
\r
403 if( pOperation->u.publish.pReceivedData != NULL )
\r
405 /* Retrieve the pointer MQTT packet pointer so it may be freed later. */
\r
406 IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
\r
407 pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData;
\r
414 /* Remove operation from pending processing list. */
\r
415 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
417 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
419 IotListDouble_Remove( &( pOperation->link ) );
\r
426 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
428 IotMqtt_Assert( pOperation != NULL );
\r
429 IotMqtt_FreeOperation( pOperation );
\r
438 case MQTT_PACKET_TYPE_PUBACK:
\r
439 IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection );
\r
441 /* Choose PUBACK deserializer. */
\r
442 deserialize = _IotMqtt_DeserializePuback;
\r
444 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
445 if( pMqttConnection->pSerializer != NULL )
\r
447 if( pMqttConnection->pSerializer->deserialize.puback != NULL )
\r
449 deserialize = pMqttConnection->pSerializer->deserialize.puback;
\r
460 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
462 /* Deserialize PUBACK and notify of result. */
\r
463 status = deserialize( pIncomingPacket );
\r
464 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
465 IOT_MQTT_PUBLISH_TO_SERVER,
\r
466 &( pIncomingPacket->packetIdentifier ) );
\r
468 if( pOperation != NULL )
\r
470 pOperation->u.operation.status = status;
\r
471 _IotMqtt_Notify( pOperation );
\r
480 case MQTT_PACKET_TYPE_SUBACK:
\r
481 IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection );
\r
483 /* Choose SUBACK deserializer. */
\r
484 deserialize = _IotMqtt_DeserializeSuback;
\r
486 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
487 if( pMqttConnection->pSerializer != NULL )
\r
489 if( pMqttConnection->pSerializer->deserialize.suback != NULL )
\r
491 deserialize = pMqttConnection->pSerializer->deserialize.suback;
\r
502 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
504 /* Deserialize SUBACK and notify of result. */
\r
505 pIncomingPacket->u.pMqttConnection = pMqttConnection;
\r
506 status = deserialize( pIncomingPacket );
\r
507 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
508 IOT_MQTT_SUBSCRIBE,
\r
509 &( pIncomingPacket->packetIdentifier ) );
\r
511 if( pOperation != NULL )
\r
513 pOperation->u.operation.status = status;
\r
514 _IotMqtt_Notify( pOperation );
\r
523 case MQTT_PACKET_TYPE_UNSUBACK:
\r
524 IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection );
\r
526 /* Choose UNSUBACK deserializer. */
\r
527 deserialize = _IotMqtt_DeserializeUnsuback;
\r
529 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
530 if( pMqttConnection->pSerializer != NULL )
\r
532 if( pMqttConnection->pSerializer->deserialize.unsuback != NULL )
\r
534 deserialize = pMqttConnection->pSerializer->deserialize.unsuback;
\r
545 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
547 /* Deserialize UNSUBACK and notify of result. */
\r
548 status = deserialize( pIncomingPacket );
\r
549 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
550 IOT_MQTT_UNSUBSCRIBE,
\r
551 &( pIncomingPacket->packetIdentifier ) );
\r
553 if( pOperation != NULL )
\r
555 pOperation->u.operation.status = status;
\r
556 _IotMqtt_Notify( pOperation );
\r
566 /* The only remaining valid type is PINGRESP. */
\r
567 IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP );
\r
568 IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection );
\r
570 /* Choose PINGRESP deserializer. */
\r
571 deserialize = _IotMqtt_DeserializePingresp;
\r
573 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
574 if( pMqttConnection->pSerializer != NULL )
\r
576 if( pMqttConnection->pSerializer->deserialize.pingresp != NULL )
\r
578 deserialize = pMqttConnection->pSerializer->deserialize.pingresp;
\r
589 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
591 /* Deserialize PINGRESP. */
\r
592 status = deserialize( pIncomingPacket );
\r
594 if( status == IOT_MQTT_SUCCESS )
\r
596 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
598 if( pMqttConnection->keepAliveFailure == false )
\r
600 IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
\r
605 IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
\r
608 pMqttConnection->keepAliveFailure = false;
\r
611 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
621 if( status != IOT_MQTT_SUCCESS )
\r
623 IotLogError( "(MQTT connection %p) Packet parser status %s.",
\r
625 IotMqtt_strerror( status ) );
\r
635 /*-----------------------------------------------------------*/
\r
637 static void _sendPuback( _mqttConnection_t * pMqttConnection,
\r
638 uint16_t packetIdentifier )
\r
640 IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
\r
641 uint8_t * pPuback = NULL;
\r
642 size_t pubackSize = 0, bytesSent = 0;
\r
644 /* Default PUBACK serializer and free packet functions. */
\r
645 IotMqttError_t ( * serializePuback )( uint16_t,
\r
647 size_t * ) = _IotMqtt_SerializePuback;
\r
648 void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
\r
650 IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
\r
652 packetIdentifier );
\r
654 /* Choose PUBACK serializer and free packet functions. */
\r
655 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
656 if( pMqttConnection->pSerializer != NULL )
\r
658 if( pMqttConnection->pSerializer->serialize.puback != NULL )
\r
660 serializePuback = pMqttConnection->pSerializer->serialize.puback;
\r
671 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
672 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
673 if( pMqttConnection->pSerializer != NULL )
\r
675 if( pMqttConnection->pSerializer->freePacket != NULL )
\r
677 freePacket = pMqttConnection->pSerializer->freePacket;
\r
688 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
690 /* Generate a PUBACK packet from the packet identifier. */
\r
691 serializeStatus = serializePuback( packetIdentifier,
\r
695 if( serializeStatus != IOT_MQTT_SUCCESS )
\r
697 IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "
\r
698 "received PUBLISH %hu.",
\r
700 packetIdentifier );
\r
704 bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
\r
708 if( bytesSent != pubackSize )
\r
710 IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"
\r
713 packetIdentifier );
\r
717 IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",
\r
719 packetIdentifier );
\r
722 freePacket( pPuback );
\r
726 /*-----------------------------------------------------------*/
\r
728 bool _IotMqtt_GetNextByte( void * pNetworkConnection,
\r
729 const IotNetworkInterface_t * pNetworkInterface,
\r
730 uint8_t * pIncomingByte )
\r
732 bool status = false;
\r
733 uint8_t incomingByte = 0;
\r
734 size_t bytesReceived = 0;
\r
736 /* Attempt to read 1 byte. */
\r
737 bytesReceived = pNetworkInterface->receive( pNetworkConnection,
\r
741 /* Set the output parameter and return success if 1 byte was read. */
\r
742 if( bytesReceived == 1 )
\r
744 *pIncomingByte = incomingByte;
\r
749 /* Network receive must return 0 on failure. */
\r
750 IotMqtt_Assert( bytesReceived == 0 );
\r
756 /*-----------------------------------------------------------*/
\r
758 void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
\r
759 _mqttConnection_t * pMqttConnection )
\r
761 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
762 IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
\r
763 IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
\r
765 /* Mark the MQTT connection as disconnected and the keep-alive as failed. */
\r
766 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
767 pMqttConnection->disconnected = true;
\r
768 pMqttConnection->keepAliveFailure = true;
\r
770 if( pMqttConnection->keepAliveMs != 0 )
\r
772 /* Keep-alive must have a PINGREQ allocated. */
\r
773 IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );
\r
774 IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );
\r
776 /* PINGREQ provides a reference to the connection, so reference count must
\r
778 IotMqtt_Assert( pMqttConnection->references > 0 );
\r
780 /* Attempt to cancel the keep-alive job. */
\r
781 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
782 pMqttConnection->keepAliveJob,
\r
785 /* If the keep-alive job was not canceled, it must be already executing.
\r
786 * Any other return value is invalid. */
\r
787 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
788 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
790 /* Clean up keep-alive if its job was successfully canceled. Otherwise,
\r
791 * the executing keep-alive job will clean up itself. */
\r
792 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
794 /* Clean up PINGREQ packet and job. */
\r
795 _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
\r
797 /* Clear data about the keep-alive. */
\r
798 pMqttConnection->keepAliveMs = 0;
\r
799 pMqttConnection->pPingreqPacket = NULL;
\r
800 pMqttConnection->pingreqPacketSize = 0;
\r
802 /* Keep-alive is cleaned up; decrement reference count. Since this
\r
803 * function must be followed with a call to DISCONNECT, a check to
\r
804 * destroy the connection is not done here. */
\r
805 pMqttConnection->references--;
\r
807 IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.",
\r
820 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
822 /* Close the network connection. */
\r
823 if( pMqttConnection->pNetworkInterface->close != NULL )
\r
825 closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );
\r
827 if( closeStatus == IOT_NETWORK_SUCCESS )
\r
829 IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection );
\r
833 IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.",
\r
840 IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection"
\r
841 " not closed.", pMqttConnection );
\r
844 /* Invoke the disconnect callback. */
\r
845 if( pMqttConnection->disconnectCallback.function != NULL )
\r
847 /* Set the members of the callback parameter. */
\r
848 callbackParam.mqttConnection = pMqttConnection;
\r
849 callbackParam.u.disconnectReason = disconnectReason;
\r
851 pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,
\r
860 /*-----------------------------------------------------------*/
\r
862 void IotMqtt_ReceiveCallback( void * pNetworkConnection,
\r
863 void * pReceiveContext )
\r
865 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
866 _mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL };
\r
868 /* Cast context to correct type. */
\r
869 _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext;
\r
871 /* Read an MQTT packet from the network. */
\r
872 status = _getIncomingPacket( pNetworkConnection,
\r
876 if( status == IOT_MQTT_SUCCESS )
\r
878 /* Deserialize the received packet. */
\r
879 status = _deserializeIncomingPacket( pMqttConnection,
\r
882 /* Free any buffers allocated for the MQTT packet. */
\r
883 if( incomingPacket.pRemainingData != NULL )
\r
885 IotMqtt_FreeMessage( incomingPacket.pRemainingData );
\r
897 /* Close the network connection on a bad response. */
\r
898 if( status == IOT_MQTT_BAD_RESPONSE )
\r
900 IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.",
\r
903 _IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED,
\r
912 /*-----------------------------------------------------------*/
\r