3 * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
24 * @file iot_mqtt_network.c
\r
25 * @brief Implements functions involving transport layer connections.
\r
28 /* The config header is always included first. */
\r
29 #include "iot_config.h"
\r
31 /* Standard includes. */
\r
34 /* Error handling include. */
\r
35 #include "iot_error.h"
\r
37 /* MQTT internal include. */
\r
38 #include "private/iot_mqtt_internal.h"
\r
40 /* Platform layer includes. */
\r
41 #include "platform/iot_threads.h"
\r
43 /* Atomics include. */
\r
44 #include "iot_atomic.h"
\r
46 /*-----------------------------------------------------------*/
\r
49 * @brief Check if an incoming packet type is valid.
\r
51 * @param[in] packetType The packet type to check.
\r
53 * @return `true` if the packet type is valid; `false` otherwise.
\r
55 static bool _incomingPacketValid( uint8_t packetType );
\r
58 * @brief Get an incoming MQTT packet from the network.
\r
60 * @param[in] pNetworkConnection Network connection to use for receive, which
\r
61 * may be different from the network connection associated with the MQTT connection.
\r
62 * @param[in] pMqttConnection The associated MQTT connection.
\r
63 * @param[out] pIncomingPacket Output parameter for the incoming packet.
\r
65 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE.
\r
67 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
\r
68 const _mqttConnection_t * pMqttConnection,
\r
69 _mqttPacket_t * pIncomingPacket );
\r
72 * @brief Deserialize a packet received from the network.
\r
74 * @param[in] pMqttConnection The associated MQTT connection.
\r
75 * @param[in] pIncomingPacket The packet received from the network.
\r
77 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR,
\r
78 * #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED.
\r
80 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
\r
81 _mqttPacket_t * pIncomingPacket );
\r
84 * @brief Send a PUBACK for a received QoS 1 PUBLISH packet.
\r
86 * @param[in] pMqttConnection Which connection the PUBACK should be sent over.
\r
87 * @param[in] packetIdentifier Which packet identifier to include in PUBACK.
\r
89 static void _sendPuback( _mqttConnection_t * pMqttConnection,
\r
90 uint16_t packetIdentifier );
\r
93 * @brief Flush a packet from the stream of incoming data.
\r
95 * This function is called when memory for a packet cannot be allocated. The
\r
96 * packet is flushed from the stream of incoming data so that the next packet
\r
99 * @param[in] pNetworkConnection Network connection to use for receive, which
\r
100 * may be different from the network connection associated with the MQTT connection.
\r
101 * @param[in] pMqttConnection The associated MQTT connection.
\r
102 * @param[in] length The length of the packet to flush.
\r
104 static void _flushPacket( void * pNetworkConnection,
\r
105 const _mqttConnection_t * pMqttConnection,
\r
109 * @cond DOXYGEN_IGNORE
\r
110 * Doxygen should ignore this section.
\r
112 * Declaration of local MQTT serializer override selectors
\r
114 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
115 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttGetPacketType_t,
\r
116 _getPacketTypeFunc,
\r
117 _IotMqtt_GetPacketType,
\r
119 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttGetRemainingLength_t,
\r
120 _getRemainingLengthFunc,
\r
121 _IotMqtt_GetRemainingLength,
\r
122 getRemainingLength )
\r
123 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
\r
124 _getConnackDeserializer,
\r
125 _IotMqtt_DeserializeConnack,
\r
126 deserialize.connack )
\r
127 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
\r
128 _getPublishDeserializer,
\r
129 _IotMqtt_DeserializePublish,
\r
130 deserialize.publish )
\r
131 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
\r
132 _getPubackDeserializer,
\r
133 _IotMqtt_DeserializePuback,
\r
134 deserialize.puback )
\r
135 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
\r
136 _getSubackDeserializer,
\r
137 _IotMqtt_DeserializeSuback,
\r
138 deserialize.suback )
\r
139 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
\r
140 _getUnsubackDeserializer,
\r
141 _IotMqtt_DeserializeUnsuback,
\r
142 deserialize.unsuback )
\r
143 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
\r
144 _getPingrespDeserializer,
\r
145 _IotMqtt_DeserializePingresp,
\r
146 deserialize.pingresp )
\r
147 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializePuback_t,
\r
148 _getMqttPubackSerializer,
\r
149 _IotMqtt_SerializePuback,
\r
151 _SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t,
\r
152 _getMqttFreePacketFunc,
\r
153 _IotMqtt_FreePacket,
\r
155 #else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
156 #define _getPacketTypeFunc( pSerializer ) _IotMqtt_GetPacketType
\r
157 #define _getRemainingLengthFunc( pSerializer ) _IotMqtt_GetRemainingLength
\r
158 #define _getConnackDeserializer( pSerializer ) _IotMqtt_DeserializeConnack
\r
159 #define _getPublishDeserializer( pSerializer ) _IotMqtt_DeserializePublish
\r
160 #define _getPubackDeserializer( pSerializer ) _IotMqtt_DeserializePuback
\r
161 #define _getSubackDeserializer( pSerializer ) _IotMqtt_DeserializeSuback
\r
162 #define _getUnsubackDeserializer( pSerializer ) _IotMqtt_DeserializeUnsuback
\r
163 #define _getPingrespDeserializer( pSerializer ) _IotMqtt_DeserializePingresp
\r
164 #define _getMqttPubackSerializer( pSerializer ) _IotMqtt_SerializePuback
\r
165 #define _getMqttFreePacketFunc( pSerializer ) _IotMqtt_FreePacket
\r
166 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
169 /*-----------------------------------------------------------*/
\r
171 static bool _incomingPacketValid( uint8_t packetType )
\r
173 bool status = true;
\r
175 /* Check packet type. Mask out lower bits to ignore flags. */
\r
176 switch( packetType & 0xf0 )
\r
178 /* Valid incoming packet types. */
\r
179 case MQTT_PACKET_TYPE_CONNACK:
\r
180 case MQTT_PACKET_TYPE_PUBLISH:
\r
181 case MQTT_PACKET_TYPE_PUBACK:
\r
182 case MQTT_PACKET_TYPE_SUBACK:
\r
183 case MQTT_PACKET_TYPE_UNSUBACK:
\r
184 case MQTT_PACKET_TYPE_PINGRESP:
\r
187 /* Any other packet type is invalid. */
\r
196 /*-----------------------------------------------------------*/
\r
198 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
\r
199 const _mqttConnection_t * pMqttConnection,
\r
200 _mqttPacket_t * pIncomingPacket )
\r
202 IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
\r
203 size_t dataBytesRead = 0;
\r
205 /* No buffer for remaining data should be allocated. */
\r
206 IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
\r
207 IotMqtt_Assert( pIncomingPacket->remainingLength == 0 );
\r
209 /* Read the packet type, which is the first byte available. */
\r
210 pIncomingPacket->type = _getPacketTypeFunc( pMqttConnection->pSerializer )( pNetworkConnection,
\r
211 pMqttConnection->pNetworkInterface );
\r
213 /* Check that the incoming packet type is valid. */
\r
214 if( _incomingPacketValid( pIncomingPacket->type ) == false )
\r
216 IotLogError( "(MQTT connection %p) Unknown packet type %02x received.",
\r
218 pIncomingPacket->type );
\r
220 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
227 /* Read the remaining length. */
\r
228 pIncomingPacket->remainingLength = _getRemainingLengthFunc( pMqttConnection->pSerializer )( pNetworkConnection,
\r
229 pMqttConnection->pNetworkInterface );
\r
231 if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
\r
233 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
240 /* Allocate a buffer for the remaining data and read the data. */
\r
241 if( pIncomingPacket->remainingLength > 0 )
\r
243 pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength );
\r
245 if( pIncomingPacket->pRemainingData == NULL )
\r
247 IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "
\r
248 "%lu for incoming packet type %lu.",
\r
250 ( unsigned long ) pIncomingPacket->remainingLength,
\r
251 ( unsigned long ) pIncomingPacket->type );
\r
253 _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );
\r
255 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
\r
262 dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection,
\r
263 pIncomingPacket->pRemainingData,
\r
264 pIncomingPacket->remainingLength );
\r
266 if( dataBytesRead != pIncomingPacket->remainingLength )
\r
268 IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
\r
280 /* Clean up on error. */
\r
281 IOT_FUNCTION_CLEANUP_BEGIN();
\r
283 if( status != IOT_MQTT_SUCCESS )
\r
285 if( pIncomingPacket->pRemainingData != NULL )
\r
287 IotMqtt_FreeMessage( pIncomingPacket->pRemainingData );
\r
299 IOT_FUNCTION_CLEANUP_END();
\r
302 /*-----------------------------------------------------------*/
\r
304 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
\r
305 _mqttPacket_t * pIncomingPacket )
\r
307 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
308 _mqttOperation_t * pOperation = NULL;
\r
310 /* A buffer for remaining data must be allocated if remaining length is not 0. */
\r
311 IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) ==
\r
312 ( pIncomingPacket->pRemainingData != NULL ) );
\r
314 /* Only valid packets should be given to this function. */
\r
315 IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true );
\r
317 /* Mask out the low bits of packet type to ignore flags. */
\r
318 switch( ( pIncomingPacket->type & 0xf0 ) )
\r
320 case MQTT_PACKET_TYPE_CONNACK:
\r
321 IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection );
\r
323 /* Deserialize CONNACK and notify of result. */
\r
324 status = _getConnackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
\r
326 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
330 if( pOperation != NULL )
\r
332 pOperation->u.operation.status = status;
\r
333 _IotMqtt_Notify( pOperation );
\r
342 case MQTT_PACKET_TYPE_PUBLISH:
\r
343 IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection );
\r
345 /* Allocate memory to handle the incoming PUBLISH. */
\r
346 pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
\r
348 if( pOperation == NULL )
\r
350 IotLogWarn( "Failed to allocate memory for incoming PUBLISH." );
\r
351 status = IOT_MQTT_NO_MEMORY;
\r
357 /* Set the members of the incoming PUBLISH operation. */
\r
358 ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
\r
359 pOperation->incomingPublish = true;
\r
360 pOperation->pMqttConnection = pMqttConnection;
\r
361 pIncomingPacket->u.pIncomingPublish = pOperation;
\r
364 /* Deserialize incoming PUBLISH. */
\r
365 status = _getPublishDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
\r
367 if( status == IOT_MQTT_SUCCESS )
\r
369 /* Send a PUBACK for QoS 1 PUBLISH. */
\r
370 if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 )
\r
372 _sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier );
\r
379 /* Transfer ownership of the received MQTT packet to the PUBLISH operation. */
\r
380 pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData;
\r
381 pIncomingPacket->pRemainingData = NULL;
\r
383 /* Add the PUBLISH to the list of operations pending processing. */
\r
384 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
385 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
\r
386 &( pOperation->link ) );
\r
387 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
389 /* Increment the MQTT connection reference count before scheduling an
\r
390 * incoming PUBLISH. */
\r
391 if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true )
\r
393 /* Schedule PUBLISH for callback invocation. */
\r
394 status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 );
\r
398 status = IOT_MQTT_NETWORK_ERROR;
\r
406 /* Free PUBLISH operation on error. */
\r
407 if( status != IOT_MQTT_SUCCESS )
\r
409 /* Check ownership of the received MQTT packet. */
\r
410 if( pOperation->u.publish.pReceivedData != NULL )
\r
412 /* Retrieve the pointer MQTT packet pointer so it may be freed later. */
\r
413 IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
\r
414 pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData;
\r
421 /* Remove operation from pending processing list. */
\r
422 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
424 if( IotLink_IsLinked( &( pOperation->link ) ) == true )
\r
426 IotListDouble_Remove( &( pOperation->link ) );
\r
433 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
435 IotMqtt_Assert( pOperation != NULL );
\r
436 IotMqtt_FreeOperation( pOperation );
\r
445 case MQTT_PACKET_TYPE_PUBACK:
\r
446 IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection );
\r
448 /* Deserialize PUBACK and notify of result. */
\r
449 status = _getPubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
\r
451 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
452 IOT_MQTT_PUBLISH_TO_SERVER,
\r
453 &( pIncomingPacket->packetIdentifier ) );
\r
455 if( pOperation != NULL )
\r
457 pOperation->u.operation.status = status;
\r
458 _IotMqtt_Notify( pOperation );
\r
467 case MQTT_PACKET_TYPE_SUBACK:
\r
468 IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection );
\r
470 /* Deserialize SUBACK and notify of result. */
\r
471 pIncomingPacket->u.pMqttConnection = pMqttConnection;
\r
473 status = _getSubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
\r
475 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
476 IOT_MQTT_SUBSCRIBE,
\r
477 &( pIncomingPacket->packetIdentifier ) );
\r
479 if( pOperation != NULL )
\r
481 pOperation->u.operation.status = status;
\r
482 _IotMqtt_Notify( pOperation );
\r
491 case MQTT_PACKET_TYPE_UNSUBACK:
\r
492 IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection );
\r
494 /* Deserialize UNSUBACK and notify of result. */
\r
495 status = _getUnsubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
\r
497 pOperation = _IotMqtt_FindOperation( pMqttConnection,
\r
498 IOT_MQTT_UNSUBSCRIBE,
\r
499 &( pIncomingPacket->packetIdentifier ) );
\r
501 if( pOperation != NULL )
\r
503 pOperation->u.operation.status = status;
\r
504 _IotMqtt_Notify( pOperation );
\r
514 /* The only remaining valid type is PINGRESP. */
\r
515 IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP );
\r
517 IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection );
\r
519 /* Deserialize PINGRESP. */
\r
520 status = _getPingrespDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
\r
522 if( status == IOT_MQTT_SUCCESS )
\r
524 if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),
\r
528 IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
\r
533 IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
\r
545 if( status != IOT_MQTT_SUCCESS )
\r
547 IotLogError( "(MQTT connection %p) Packet parser status %s.",
\r
549 IotMqtt_strerror( status ) );
\r
559 /*-----------------------------------------------------------*/
\r
561 static void _sendPuback( _mqttConnection_t * pMqttConnection,
\r
562 uint16_t packetIdentifier )
\r
564 IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
\r
565 _mqttOperation_t * pPubackOperation = NULL;
\r
567 IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
\r
569 packetIdentifier );
\r
571 /* Create a PUBACK operation. */
\r
572 status = _IotMqtt_CreateOperation( pMqttConnection,
\r
575 &pPubackOperation );
\r
577 if( status != IOT_MQTT_SUCCESS )
\r
579 IOT_GOTO_CLEANUP();
\r
582 /* Set the operation type. */
\r
583 pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;
\r
585 /* Generate a PUBACK packet from the packet identifier. */
\r
586 status = _getMqttPubackSerializer( pMqttConnection->pSerializer )( packetIdentifier,
\r
587 &( pPubackOperation->u.operation.pMqttPacket ),
\r
588 &( pPubackOperation->u.operation.packetSize ) );
\r
590 if( status != IOT_MQTT_SUCCESS )
\r
592 IOT_GOTO_CLEANUP();
\r
595 /* Add the PUBACK operation to the send queue for network transmission. */
\r
596 status = _IotMqtt_ScheduleOperation( pPubackOperation,
\r
597 _IotMqtt_ProcessSend,
\r
600 if( status != IOT_MQTT_SUCCESS )
\r
602 IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",
\r
605 IOT_GOTO_CLEANUP();
\r
612 /* Clean up on error. */
\r
613 IOT_FUNCTION_CLEANUP_BEGIN();
\r
615 if( status != IOT_MQTT_SUCCESS )
\r
617 if( pPubackOperation != NULL )
\r
619 _IotMqtt_DestroyOperation( pPubackOperation );
\r
632 /*-----------------------------------------------------------*/
\r
634 static void _flushPacket( void * pNetworkConnection,
\r
635 const _mqttConnection_t * pMqttConnection,
\r
638 size_t bytesFlushed = 0;
\r
639 uint8_t receivedByte = 0;
\r
641 for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )
\r
643 ( void ) _IotMqtt_GetNextByte( pNetworkConnection,
\r
644 pMqttConnection->pNetworkInterface,
\r
649 /*-----------------------------------------------------------*/
\r
651 bool _IotMqtt_GetNextByte( void * pNetworkConnection,
\r
652 const IotNetworkInterface_t * pNetworkInterface,
\r
653 uint8_t * pIncomingByte )
\r
655 bool status = false;
\r
656 uint8_t incomingByte = 0;
\r
657 size_t bytesReceived = 0;
\r
659 /* Attempt to read 1 byte. */
\r
660 bytesReceived = pNetworkInterface->receive( pNetworkConnection,
\r
664 /* Set the output parameter and return success if 1 byte was read. */
\r
665 if( bytesReceived == 1 )
\r
667 *pIncomingByte = incomingByte;
\r
672 /* Network receive must return 0 on failure. */
\r
673 IotMqtt_Assert( bytesReceived == 0 );
\r
679 /*-----------------------------------------------------------*/
\r
681 void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
\r
682 _mqttConnection_t * pMqttConnection )
\r
684 IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
\r
685 IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
\r
686 IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
\r
687 void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;
\r
689 /* Disconnect callback function. */
\r
690 void ( * disconnectCallback )( void *,
\r
691 IotMqttCallbackParam_t * ) = NULL;
\r
693 /* Network close function. */
\r
694 IotNetworkError_t ( * closeConnection) ( IotNetworkConnection_t ) = NULL;
\r
696 /* Mark the MQTT connection as disconnected and the keep-alive as failed. */
\r
697 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
\r
698 pMqttConnection->disconnected = true;
\r
700 if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
\r
702 /* Keep-alive must have a PINGREQ allocated. */
\r
703 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );
\r
704 IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );
\r
706 /* PINGREQ provides a reference to the connection, so reference count must
\r
708 IotMqtt_Assert( pMqttConnection->references > 0 );
\r
710 /* Attempt to cancel the keep-alive job. */
\r
711 taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
\r
712 pMqttConnection->pingreq.job,
\r
715 /* Clean up keep-alive if its job was successfully canceled. Otherwise,
\r
716 * the executing keep-alive job will clean up itself. */
\r
717 if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
\r
719 /* Free the packet */
\r
720 _getMqttFreePacketFunc( pMqttConnection->pSerializer )( pMqttConnection->pingreq.u.operation.pMqttPacket );
\r
722 /* Clear data about the keep-alive. */
\r
723 pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
\r
724 pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
\r
725 pMqttConnection->pingreq.u.operation.packetSize = 0;
\r
727 /* Keep-alive is cleaned up; decrement reference count. Since this
\r
728 * function must be followed with a call to DISCONNECT, a check to
\r
729 * destroy the connection is not done here. */
\r
730 pMqttConnection->references--;
\r
732 IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.",
\r
745 /* Copy the function pointers and contexts, as the MQTT connection may be
\r
746 * modified after the mutex is released. */
\r
747 disconnectCallback = pMqttConnection->disconnectCallback.function;
\r
748 pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;
\r
750 closeConnection = pMqttConnection->pNetworkInterface->close;
\r
751 pNetworkConnection = pMqttConnection->pNetworkConnection;
\r
753 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
\r
755 /* Close the network connection. */
\r
756 if( closeConnection != NULL )
\r
758 closeStatus = closeConnection( pNetworkConnection );
\r
760 if( closeStatus == IOT_NETWORK_SUCCESS )
\r
762 IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection );
\r
766 IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.",
\r
773 IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection"
\r
774 " not closed.", pMqttConnection );
\r
777 /* Invoke the disconnect callback. */
\r
778 if( disconnectCallback != NULL )
\r
780 /* Set the members of the callback parameter. */
\r
781 callbackParam.mqttConnection = pMqttConnection;
\r
782 callbackParam.u.disconnectReason = disconnectReason;
\r
784 disconnectCallback( pDisconnectCallbackContext,
\r
793 /*-----------------------------------------------------------*/
\r
795 void IotMqtt_ReceiveCallback( IotNetworkConnection_t pNetworkConnection,
\r
796 void * pReceiveContext )
\r
798 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
799 _mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL };
\r
801 /* Cast context to correct type. */
\r
802 _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext;
\r
804 /* Read an MQTT packet from the network. */
\r
805 status = _getIncomingPacket( pNetworkConnection,
\r
809 if( status == IOT_MQTT_SUCCESS )
\r
811 /* Deserialize the received packet. */
\r
812 status = _deserializeIncomingPacket( pMqttConnection,
\r
815 /* Free any buffers allocated for the MQTT packet. */
\r
816 if( incomingPacket.pRemainingData != NULL )
\r
818 IotMqtt_FreeMessage( incomingPacket.pRemainingData );
\r
830 /* Close the network connection on a bad response. */
\r
831 if( status == IOT_MQTT_BAD_RESPONSE )
\r
833 IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.",
\r
836 _IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED,
\r
845 /*-----------------------------------------------------------*/
\r
847 IotMqttError_t IotMqtt_GetIncomingMQTTPacketTypeAndLength( IotMqttPacketInfo_t * pIncomingPacket,
\r
848 IotMqttGetNextByte_t getNextByte,
\r
849 void * pNetworkConnection )
\r
851 IotMqttError_t status = IOT_MQTT_SUCCESS;
\r
853 /* Read the packet type, which is the first byte available. */
\r
854 if( getNextByte( pNetworkConnection, &( pIncomingPacket->type ) ) == IOT_MQTT_SUCCESS )
\r
856 /* Check that the incoming packet type is valid. */
\r
857 if( _incomingPacketValid( pIncomingPacket->type ) == false )
\r
859 IotLogError( "(MQTT connection) Unknown packet type %02x received.",
\r
860 pIncomingPacket->type );
\r
862 status = IOT_MQTT_BAD_RESPONSE;
\r
866 /* Read the remaining length. */
\r
867 pIncomingPacket->remainingLength = _IotMqtt_GetRemainingLength_Generic( pNetworkConnection,
\r
870 if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
\r
872 status = IOT_MQTT_BAD_RESPONSE;
\r
878 status = IOT_MQTT_NETWORK_ERROR;
\r
884 /*-----------------------------------------------------------*/
\r