2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_network.c
\r
28 * @brief Implements functions involving transport layer connections.
\r
31 /* The config header is always included first. */
\r
32 #include "iot_config.h"
\r
34 /* Standard includes. */
\r
37 /* Error handling include. */
\r
38 #include "private/iot_error.h"
\r
40 /* MQTT internal include. */
\r
41 #include "private/iot_mqtt_internal.h"
\r
43 /* Platform layer includes. */
\r
44 #include "platform/iot_threads.h"
\r
46 /* Atomics include. */
\r
47 #include "iot_atomic.h"
\r
49 /*-----------------------------------------------------------*/
\r
52 * @brief Check if an incoming packet type is valid.
\r
54 * @param[in] packetType The packet type to check.
\r
56 * @return `true` if the packet type is valid; `false` otherwise.
\r
58 static bool _incomingPacketValid( uint8_t packetType );
\r
61 * @brief Get an incoming MQTT packet from the network.
\r
63 * @param[in] pNetworkConnection Network connection to use for receive, which
\r
64 * may be different from the network connection associated with the MQTT connection.
\r
65 * @param[in] pMqttConnection The associated MQTT connection.
\r
66 * @param[out] pIncomingPacket Output parameter for the incoming packet.
\r
68 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE.
\r
70 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
\r
71 const _mqttConnection_t * pMqttConnection,
\r
72 _mqttPacket_t * pIncomingPacket );
\r
75 * @brief Deserialize a packet received from the network.
\r
77 * @param[in] pMqttConnection The associated MQTT connection.
\r
78 * @param[in] pIncomingPacket The packet received from the network.
\r
80 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR,
\r
81 * #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED.
\r
83 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
\r
84 _mqttPacket_t * pIncomingPacket );
\r
87 * @brief Send a PUBACK for a received QoS 1 PUBLISH packet.
\r
89 * @param[in] pMqttConnection Which connection the PUBACK should be sent over.
\r
90 * @param[in] packetIdentifier Which packet identifier to include in PUBACK.
\r
92 static void _sendPuback( _mqttConnection_t * pMqttConnection,
\r
93 uint16_t packetIdentifier );
\r
96 * @brief Flush a packet from the stream of incoming data.
\r
98 * This function is called when memory for a packet cannot be allocated. The
\r
99 * packet is flushed from the stream of incoming data so that the next packet
\r
102 * @param[in] pNetworkConnection Network connection to use for receive, which
\r
103 * may be different from the network connection associated with the MQTT connection.
\r
104 * @param[in] pMqttConnection The associated MQTT connection.
\r
105 * @param[in] length The length of the packet to flush.
\r
107 static void _flushPacket( void * pNetworkConnection,
\r
108 const _mqttConnection_t * pMqttConnection,
\r
111 /*-----------------------------------------------------------*/
\r
113 static bool _incomingPacketValid( uint8_t packetType )
\r
115 bool status = true;
\r
117 /* Check packet type. Mask out lower bits to ignore flags. */
\r
118 switch( packetType & 0xf0 )
\r
120 /* Valid incoming packet types. */
\r
121 case MQTT_PACKET_TYPE_CONNACK:
\r
122 case MQTT_PACKET_TYPE_PUBLISH:
\r
123 case MQTT_PACKET_TYPE_PUBACK:
\r
124 case MQTT_PACKET_TYPE_SUBACK:
\r
125 case MQTT_PACKET_TYPE_UNSUBACK:
\r
126 case MQTT_PACKET_TYPE_PINGRESP:
\r
129 /* Any other packet type is invalid. */
\r
138 /*-----------------------------------------------------------*/
\r
140 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
\r
141 const _mqttConnection_t * pMqttConnection,
\r
142 _mqttPacket_t * pIncomingPacket )
\r
144 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
145 size_t dataBytesRead = 0;
\r
147 /* Default functions for retrieving packet type and length. */
\r
148 uint8_t ( * getPacketType )( void *,
\r
149 const IotNetworkInterface_t * ) = _IotMqtt_GetPacketType;
\r
150 size_t ( * getRemainingLength )( void *,
\r
151 const IotNetworkInterface_t * ) = _IotMqtt_GetRemainingLength;
\r
153 /* No buffer for remaining data should be allocated. */
\r
154 IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
\r
155 IotMqtt_Assert( pIncomingPacket->remainingLength == 0 );
\r
157 /* Choose packet type and length functions. */
\r
158 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
159 if( pMqttConnection->pSerializer != NULL )
\r
161 if( pMqttConnection->pSerializer->getPacketType != NULL )
\r
163 getPacketType = pMqttConnection->pSerializer->getPacketType;
\r
170 if( pMqttConnection->pSerializer->getRemainingLength != NULL )
\r
172 getRemainingLength = pMqttConnection->pSerializer->getRemainingLength;
\r
183 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
185 /* Read the packet type, which is the first byte available. */
\r
186 pIncomingPacket->type = getPacketType( pNetworkConnection,
\r
187 pMqttConnection->pNetworkInterface );
\r
189 /* Check that the incoming packet type is valid. */
\r
190 if( _incomingPacketValid( pIncomingPacket->type ) == false )
\r
192 IotLogError( "(MQTT connection %p) Unknown packet type %02x received.",
\r
194 pIncomingPacket->type );
\r
196 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
203 /* Read the remaining length. */
\r
204 pIncomingPacket->remainingLength = getRemainingLength( pNetworkConnection,
\r
205 pMqttConnection->pNetworkInterface );
\r
207 if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
\r
209 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
216 /* Allocate a buffer for the remaining data and read the data. */
\r
217 if( pIncomingPacket->remainingLength > 0 )
\r
219 pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength );
\r
221 if( pIncomingPacket->pRemainingData == NULL )
\r
223 IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "
\r
224 "%lu for incoming packet type %lu.",
\r
226 ( unsigned long ) pIncomingPacket->remainingLength,
\r
227 ( unsigned long ) pIncomingPacket->type );
\r
229 _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );
\r
231 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
238 dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection,
\r
239 pIncomingPacket->pRemainingData,
\r
240 pIncomingPacket->remainingLength );
\r
242 if( dataBytesRead != pIncomingPacket->remainingLength )
\r
244 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
256 /* Clean up on error. */
\r
257 IOT_FUNCTION_CLEANUP_BEGIN();
\r
259 if( status != IOT_MQTT_SUCCESS )
\r
261 if( pIncomingPacket->pRemainingData != NULL )
\r
263 IotMqtt_FreeMessage( pIncomingPacket->pRemainingData );
\r
275 IOT_FUNCTION_CLEANUP_END();
\r
278 /*-----------------------------------------------------------*/
\r
280 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
\r
281 _mqttPacket_t * pIncomingPacket )
\r
283 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
284 _mqttOperation_t * pOperation = NULL;
\r
286 /* Deserializer function. */
\r
287 IotMqttError_t ( * deserialize )( _mqttPacket_t * ) = NULL;
\r
289 /* A buffer for remaining data must be allocated if remaining length is not 0. */
\r
290 IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) ==
\r
291 ( pIncomingPacket->pRemainingData != NULL ) );
\r
293 /* Only valid packets should be given to this function. */
\r
294 IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true );
\r
296 /* Mask out the low bits of packet type to ignore flags. */
\r
297 switch( ( pIncomingPacket->type & 0xf0 ) )
\r
299 case MQTT_PACKET_TYPE_CONNACK:
\r
300 IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection );
\r
302 /* Choose CONNACK deserializer. */
\r
303 deserialize = _IotMqtt_DeserializeConnack;
\r
305 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
306 if( pMqttConnection->pSerializer != NULL )
\r
308 if( pMqttConnection->pSerializer->deserialize.connack != NULL )
\r
310 deserialize = pMqttConnection->pSerializer->deserialize.connack;
\r
321 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
323 /* Deserialize CONNACK and notify of result. */
\r
324 status = deserialize( pIncomingPacket );
\r
325 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
329 if( pOperation != NULL )
\r
331 pOperation->u.operation.status = status;
\r
332 _IotMqtt_Notify( pOperation );
\r
341 case MQTT_PACKET_TYPE_PUBLISH:
\r
342 IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection );
\r
344 /* Allocate memory to handle the incoming PUBLISH. */
\r
345 pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
\r
347 if( pOperation == NULL )
\r
349 IotLogWarn( "Failed to allocate memory for incoming PUBLISH." );
\r
350 status = IOT_MQTT_NO_MEMORY;
\r
356 /* Set the members of the incoming PUBLISH operation. */
\r
357 ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
\r
358 pOperation->incomingPublish = true;
\r
359 pOperation->pMqttConnection = pMqttConnection;
\r
360 pIncomingPacket->u.pIncomingPublish = pOperation;
\r
363 /* Choose a PUBLISH deserializer. */
\r
364 deserialize = _IotMqtt_DeserializePublish;
\r
366 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
367 if( pMqttConnection->pSerializer != NULL )
\r
369 if( pMqttConnection->pSerializer->deserialize.publish != NULL )
\r
371 deserialize = pMqttConnection->pSerializer->deserialize.publish;
\r
382 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
384 /* Deserialize incoming PUBLISH. */
\r
385 status = deserialize( pIncomingPacket );
\r
387 if( status == IOT_MQTT_SUCCESS )
\r
389 /* Send a PUBACK for QoS 1 PUBLISH. */
\r
390 if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 )
\r
392 _sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier );
\r
399 /* Transfer ownership of the received MQTT packet to the PUBLISH operation. */
\r
400 pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData;
\r
401 pIncomingPacket->pRemainingData = NULL;
\r
403 /* Add the PUBLISH to the list of operations pending processing. */
\r
404 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
405 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
406 &( pOperation->link ) );
\r
407 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
409 /* Increment the MQTT connection reference count before scheduling an
\r
410 * incoming PUBLISH. */
\r
411 if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true )
\r
413 /* Schedule PUBLISH for callback invocation. */
\r
414 status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 );
\r
418 status = IOT_MQTT_NETWORK_ERROR;
\r
426 /* Free PUBLISH operation on error. */
\r
427 if( status != IOT_MQTT_SUCCESS )
\r
429 /* Check ownership of the received MQTT packet. */
\r
430 if( pOperation->u.publish.pReceivedData != NULL )
\r
432 /* Retrieve the pointer MQTT packet pointer so it may be freed later. */
\r
433 IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
\r
434 pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData;
\r
441 /* Remove operation from pending processing list. */
\r
442 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
444 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
446 IotListDouble_Remove( &( pOperation->link ) );
\r
453 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
455 IotMqtt_Assert( pOperation != NULL );
\r
456 IotMqtt_FreeOperation( pOperation );
\r
465 case MQTT_PACKET_TYPE_PUBACK:
\r
466 IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection );
\r
468 /* Choose PUBACK deserializer. */
\r
469 deserialize = _IotMqtt_DeserializePuback;
\r
471 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
472 if( pMqttConnection->pSerializer != NULL )
\r
474 if( pMqttConnection->pSerializer->deserialize.puback != NULL )
\r
476 deserialize = pMqttConnection->pSerializer->deserialize.puback;
\r
487 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
489 /* Deserialize PUBACK and notify of result. */
\r
490 status = deserialize( pIncomingPacket );
\r
491 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
492 IOT_MQTT_PUBLISH_TO_SERVER,
\r
493 &( pIncomingPacket->packetIdentifier ) );
\r
495 if( pOperation != NULL )
\r
497 pOperation->u.operation.status = status;
\r
498 _IotMqtt_Notify( pOperation );
\r
507 case MQTT_PACKET_TYPE_SUBACK:
\r
508 IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection );
\r
510 /* Choose SUBACK deserializer. */
\r
511 deserialize = _IotMqtt_DeserializeSuback;
\r
513 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
514 if( pMqttConnection->pSerializer != NULL )
\r
516 if( pMqttConnection->pSerializer->deserialize.suback != NULL )
\r
518 deserialize = pMqttConnection->pSerializer->deserialize.suback;
\r
529 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
531 /* Deserialize SUBACK and notify of result. */
\r
532 pIncomingPacket->u.pMqttConnection = pMqttConnection;
\r
533 status = deserialize( pIncomingPacket );
\r
534 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
535 IOT_MQTT_SUBSCRIBE,
\r
536 &( pIncomingPacket->packetIdentifier ) );
\r
538 if( pOperation != NULL )
\r
540 pOperation->u.operation.status = status;
\r
541 _IotMqtt_Notify( pOperation );
\r
550 case MQTT_PACKET_TYPE_UNSUBACK:
\r
551 IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection );
\r
553 /* Choose UNSUBACK deserializer. */
\r
554 deserialize = _IotMqtt_DeserializeUnsuback;
\r
556 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
557 if( pMqttConnection->pSerializer != NULL )
\r
559 if( pMqttConnection->pSerializer->deserialize.unsuback != NULL )
\r
561 deserialize = pMqttConnection->pSerializer->deserialize.unsuback;
\r
572 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
574 /* Deserialize UNSUBACK and notify of result. */
\r
575 status = deserialize( pIncomingPacket );
\r
576 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
577 IOT_MQTT_UNSUBSCRIBE,
\r
578 &( pIncomingPacket->packetIdentifier ) );
\r
580 if( pOperation != NULL )
\r
582 pOperation->u.operation.status = status;
\r
583 _IotMqtt_Notify( pOperation );
\r
593 /* The only remaining valid type is PINGRESP. */
\r
594 IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP );
\r
595 IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection );
\r
597 /* Choose PINGRESP deserializer. */
\r
598 deserialize = _IotMqtt_DeserializePingresp;
\r
600 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
601 if( pMqttConnection->pSerializer != NULL )
\r
603 if( pMqttConnection->pSerializer->deserialize.pingresp != NULL )
\r
605 deserialize = pMqttConnection->pSerializer->deserialize.pingresp;
\r
616 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
618 /* Deserialize PINGRESP. */
\r
619 status = deserialize( pIncomingPacket );
\r
621 if( status == IOT_MQTT_SUCCESS )
\r
623 if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),
\r
627 IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
\r
632 IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
\r
644 if( status != IOT_MQTT_SUCCESS )
\r
646 IotLogError( "(MQTT connection %p) Packet parser status %s.",
\r
648 IotMqtt_strerror( status ) );
\r
658 /*-----------------------------------------------------------*/
\r
660 static void _sendPuback( _mqttConnection_t * pMqttConnection,
\r
661 uint16_t packetIdentifier )
\r
663 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
664 _mqttOperation_t * pPubackOperation = NULL;
\r
666 /* Default PUBACK serializer function. */
\r
667 IotMqttError_t ( * serializePuback )( uint16_t,
\r
669 size_t * ) = _IotMqtt_SerializePuback;
\r
671 IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
\r
673 packetIdentifier );
\r
675 /* Choose PUBACK serializer and free packet functions. */
\r
676 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
677 if( pMqttConnection->pSerializer != NULL )
\r
679 if( pMqttConnection->pSerializer->serialize.puback != NULL )
\r
681 serializePuback = pMqttConnection->pSerializer->serialize.puback;
\r
692 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
694 /* Create a PUBACK operation. */
\r
695 status = _IotMqtt_CreateOperation( pMqttConnection,
\r
698 &pPubackOperation );
\r
700 if( status != IOT_MQTT_SUCCESS )
\r
702 IOT_GOTO_CLEANUP();
\r
705 /* Set the operation type. */
\r
706 pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;
\r
708 /* Generate a PUBACK packet from the packet identifier. */
\r
709 status = serializePuback( packetIdentifier,
\r
710 &( pPubackOperation->u.operation.pMqttPacket ),
\r
711 &( pPubackOperation->u.operation.packetSize ) );
\r
713 if( status != IOT_MQTT_SUCCESS )
\r
715 IOT_GOTO_CLEANUP();
\r
718 /* Add the PUBACK operation to the send queue for network transmission. */
\r
719 status = _IotMqtt_ScheduleOperation( pPubackOperation,
\r
720 _IotMqtt_ProcessSend,
\r
723 if( status != IOT_MQTT_SUCCESS )
\r
725 IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",
\r
728 IOT_GOTO_CLEANUP();
\r
735 /* Clean up on error. */
\r
736 IOT_FUNCTION_CLEANUP_BEGIN();
\r
738 if( status != IOT_MQTT_SUCCESS )
\r
740 if( pPubackOperation != NULL )
\r
742 _IotMqtt_DestroyOperation( pPubackOperation );
\r
755 /*-----------------------------------------------------------*/
\r
757 static void _flushPacket( void * pNetworkConnection,
\r
758 const _mqttConnection_t * pMqttConnection,
\r
761 size_t bytesFlushed = 0;
\r
762 uint8_t receivedByte = 0;
\r
764 for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )
\r
766 ( void ) _IotMqtt_GetNextByte( pNetworkConnection,
\r
767 pMqttConnection->pNetworkInterface,
\r
772 /*-----------------------------------------------------------*/
\r
774 bool _IotMqtt_GetNextByte( void * pNetworkConnection,
\r
775 const IotNetworkInterface_t * pNetworkInterface,
\r
776 uint8_t * pIncomingByte )
\r
778 bool status = false;
\r
779 uint8_t incomingByte = 0;
\r
780 size_t bytesReceived = 0;
\r
782 /* Attempt to read 1 byte. */
\r
783 bytesReceived = pNetworkInterface->receive( pNetworkConnection,
\r
787 /* Set the output parameter and return success if 1 byte was read. */
\r
788 if( bytesReceived == 1 )
\r
790 *pIncomingByte = incomingByte;
\r
795 /* Network receive must return 0 on failure. */
\r
796 IotMqtt_Assert( bytesReceived == 0 );
\r
802 /*-----------------------------------------------------------*/
\r
804 void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
\r
805 _mqttConnection_t * pMqttConnection )
\r
807 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
808 IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
\r
809 IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
\r
810 void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;
\r
812 /* Disconnect callback function. */
\r
813 void ( * disconnectCallback )( void *,
\r
814 IotMqttCallbackParam_t * ) = NULL;
\r
816 /* Network close function. */
\r
817 IotNetworkError_t ( * closeConnection) ( void * ) = NULL;
\r
819 /* Default free packet function. */
\r
820 void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
\r
822 /* Mark the MQTT connection as disconnected and the keep-alive as failed. */
\r
823 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
824 pMqttConnection->disconnected = true;
\r
826 if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
\r
828 /* Keep-alive must have a PINGREQ allocated. */
\r
829 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );
\r
830 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );
\r
832 /* PINGREQ provides a reference to the connection, so reference count must
\r
834 IotMqtt_Assert( pMqttConnection->references > 0 );
\r
836 /* Attempt to cancel the keep-alive job. */
\r
837 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
838 pMqttConnection->pingreq.job,
\r
841 /* If the keep-alive job was not canceled, it must be already executing.
\r
842 * Any other return value is invalid. */
\r
843 IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
\r
844 ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
\r
846 /* Clean up keep-alive if its job was successfully canceled. Otherwise,
\r
847 * the executing keep-alive job will clean up itself. */
\r
848 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
850 /* Choose a function to free the packet. */
\r
851 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
852 if( pMqttConnection->pSerializer != NULL )
\r
854 if( pMqttConnection->pSerializer->freePacket != NULL )
\r
856 freePacket = pMqttConnection->pSerializer->freePacket;
\r
861 freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
\r
863 /* Clear data about the keep-alive. */
\r
864 pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
\r
865 pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
\r
866 pMqttConnection->pingreq.u.operation.packetSize = 0;
\r
868 /* Keep-alive is cleaned up; decrement reference count. Since this
\r
869 * function must be followed with a call to DISCONNECT, a check to
\r
870 * destroy the connection is not done here. */
\r
871 pMqttConnection->references--;
\r
873 IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.",
\r
886 /* Copy the function pointers and contexts, as the MQTT connection may be
\r
887 * modified after the mutex is released. */
\r
888 disconnectCallback = pMqttConnection->disconnectCallback.function;
\r
889 pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;
\r
891 closeConnection = pMqttConnection->pNetworkInterface->close;
\r
892 pNetworkConnection = pMqttConnection->pNetworkConnection;
\r
894 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
896 /* Close the network connection. */
\r
897 if( closeConnection != NULL )
\r
899 closeStatus = closeConnection( pNetworkConnection );
\r
901 if( closeStatus == IOT_NETWORK_SUCCESS )
\r
903 IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection );
\r
907 IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.",
\r
914 IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection"
\r
915 " not closed.", pMqttConnection );
\r
918 /* Invoke the disconnect callback. */
\r
919 if( disconnectCallback != NULL )
\r
921 /* Set the members of the callback parameter. */
\r
922 callbackParam.mqttConnection = pMqttConnection;
\r
923 callbackParam.u.disconnectReason = disconnectReason;
\r
925 disconnectCallback( pDisconnectCallbackContext,
\r
934 /*-----------------------------------------------------------*/
\r
936 void IotMqtt_ReceiveCallback( void * pNetworkConnection,
\r
937 void * pReceiveContext )
\r
939 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
940 _mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL };
\r
942 /* Cast context to correct type. */
\r
943 _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext;
\r
945 /* Read an MQTT packet from the network. */
\r
946 status = _getIncomingPacket( pNetworkConnection,
\r
950 if( status == IOT_MQTT_SUCCESS )
\r
952 /* Deserialize the received packet. */
\r
953 status = _deserializeIncomingPacket( pMqttConnection,
\r
956 /* Free any buffers allocated for the MQTT packet. */
\r
957 if( incomingPacket.pRemainingData != NULL )
\r
959 IotMqtt_FreeMessage( incomingPacket.pRemainingData );
\r
971 /* Close the network connection on a bad response. */
\r
972 if( status == IOT_MQTT_BAD_RESPONSE )
\r
974 IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.",
\r
977 _IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED,
\r
986 /*-----------------------------------------------------------*/
\r