]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
Correct an err in queue.c introduced when previously updating behaviour when queue...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_network.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_network.c\r
28  * @brief Implements functions involving transport layer connections.\r
29  */\r
30 \r
31 /* The config header is always included first. */\r
32 #include "iot_config.h"\r
33 \r
34 /* Standard includes. */\r
35 #include <string.h>\r
36 \r
37 /* Error handling include. */\r
38 #include "private/iot_error.h"\r
39 \r
40 /* MQTT internal include. */\r
41 #include "private/iot_mqtt_internal.h"\r
42 \r
43 /* Platform layer includes. */\r
44 #include "platform/iot_threads.h"\r
45 \r
46 /* Atomics include. */\r
47 #include "iot_atomic.h"\r
48 \r
49 /*-----------------------------------------------------------*/\r
50 \r
51 /**\r
52  * @brief Check if an incoming packet type is valid.\r
53  *\r
54  * @param[in] packetType The packet type to check.\r
55  *\r
56  * @return `true` if the packet type is valid; `false` otherwise.\r
57  */\r
58 static bool _incomingPacketValid( uint8_t packetType );\r
59 \r
60 /**\r
61  * @brief Get an incoming MQTT packet from the network.\r
62  *\r
63  * @param[in] pNetworkConnection Network connection to use for receive, which\r
64  * may be different from the network connection associated with the MQTT connection.\r
65  * @param[in] pMqttConnection The associated MQTT connection.\r
66  * @param[out] pIncomingPacket Output parameter for the incoming packet.\r
67  *\r
68  * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE.\r
69  */\r
70 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,\r
71                                           const _mqttConnection_t * pMqttConnection,\r
72                                           _mqttPacket_t * pIncomingPacket );\r
73 \r
74 /**\r
75  * @brief Deserialize a packet received from the network.\r
76  *\r
77  * @param[in] pMqttConnection The associated MQTT connection.\r
78  * @param[in] pIncomingPacket The packet received from the network.\r
79  *\r
80  * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR,\r
81  * #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED.\r
82  */\r
83 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,\r
84                                                   _mqttPacket_t * pIncomingPacket );\r
85 \r
86 /**\r
87  * @brief Send a PUBACK for a received QoS 1 PUBLISH packet.\r
88  *\r
89  * @param[in] pMqttConnection Which connection the PUBACK should be sent over.\r
90  * @param[in] packetIdentifier Which packet identifier to include in PUBACK.\r
91  */\r
92 static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
93                          uint16_t packetIdentifier );\r
94 \r
95 /**\r
96  * @brief Flush a packet from the stream of incoming data.\r
97  *\r
98  * This function is called when memory for a packet cannot be allocated. The\r
99  * packet is flushed from the stream of incoming data so that the next packet\r
100  * may be read.\r
101  *\r
102  * @param[in] pNetworkConnection Network connection to use for receive, which\r
103  * may be different from the network connection associated with the MQTT connection.\r
104  * @param[in] pMqttConnection The associated MQTT connection.\r
105  * @param[in] length The length of the packet to flush.\r
106  */\r
107 static void _flushPacket( void * pNetworkConnection,\r
108                           const _mqttConnection_t * pMqttConnection,\r
109                           size_t length );\r
110 \r
111 /*-----------------------------------------------------------*/\r
112 \r
113 static bool _incomingPacketValid( uint8_t packetType )\r
114 {\r
115     bool status = true;\r
116 \r
117     /* Check packet type. Mask out lower bits to ignore flags. */\r
118     switch( packetType & 0xf0 )\r
119     {\r
120         /* Valid incoming packet types. */\r
121         case MQTT_PACKET_TYPE_CONNACK:\r
122         case MQTT_PACKET_TYPE_PUBLISH:\r
123         case MQTT_PACKET_TYPE_PUBACK:\r
124         case MQTT_PACKET_TYPE_SUBACK:\r
125         case MQTT_PACKET_TYPE_UNSUBACK:\r
126         case MQTT_PACKET_TYPE_PINGRESP:\r
127             break;\r
128 \r
129         /* Any other packet type is invalid. */\r
130         default:\r
131             status = false;\r
132             break;\r
133     }\r
134 \r
135     return status;\r
136 }\r
137 \r
138 /*-----------------------------------------------------------*/\r
139 \r
140 static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,\r
141                                           const _mqttConnection_t * pMqttConnection,\r
142                                           _mqttPacket_t * pIncomingPacket )\r
143 {\r
144     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
145     size_t dataBytesRead = 0;\r
146 \r
147     /* Default functions for retrieving packet type and length. */\r
148     uint8_t ( * getPacketType )( void *,\r
149                                  const IotNetworkInterface_t * ) = _IotMqtt_GetPacketType;\r
150     size_t ( * getRemainingLength )( void *,\r
151                                      const IotNetworkInterface_t * ) = _IotMqtt_GetRemainingLength;\r
152 \r
153     /* No buffer for remaining data should be allocated. */\r
154     IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );\r
155     IotMqtt_Assert( pIncomingPacket->remainingLength == 0 );\r
156 \r
157     /* Choose packet type and length functions. */\r
158     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
159         if( pMqttConnection->pSerializer != NULL )\r
160         {\r
161             if( pMqttConnection->pSerializer->getPacketType != NULL )\r
162             {\r
163                 getPacketType = pMqttConnection->pSerializer->getPacketType;\r
164             }\r
165             else\r
166             {\r
167                 EMPTY_ELSE_MARKER;\r
168             }\r
169 \r
170             if( pMqttConnection->pSerializer->getRemainingLength != NULL )\r
171             {\r
172                 getRemainingLength = pMqttConnection->pSerializer->getRemainingLength;\r
173             }\r
174             else\r
175             {\r
176                 EMPTY_ELSE_MARKER;\r
177             }\r
178         }\r
179         else\r
180         {\r
181             EMPTY_ELSE_MARKER;\r
182         }\r
183     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
184 \r
185     /* Read the packet type, which is the first byte available. */\r
186     pIncomingPacket->type = getPacketType( pNetworkConnection,\r
187                                            pMqttConnection->pNetworkInterface );\r
188 \r
189     /* Check that the incoming packet type is valid. */\r
190     if( _incomingPacketValid( pIncomingPacket->type ) == false )\r
191     {\r
192         IotLogError( "(MQTT connection %p) Unknown packet type %02x received.",\r
193                      pMqttConnection,\r
194                      pIncomingPacket->type );\r
195 \r
196         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );\r
197     }\r
198     else\r
199     {\r
200         EMPTY_ELSE_MARKER;\r
201     }\r
202 \r
203     /* Read the remaining length. */\r
204     pIncomingPacket->remainingLength = getRemainingLength( pNetworkConnection,\r
205                                                            pMqttConnection->pNetworkInterface );\r
206 \r
207     if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )\r
208     {\r
209         IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );\r
210     }\r
211     else\r
212     {\r
213         EMPTY_ELSE_MARKER;\r
214     }\r
215 \r
216     /* Allocate a buffer for the remaining data and read the data. */\r
217     if( pIncomingPacket->remainingLength > 0 )\r
218     {\r
219         pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength );\r
220 \r
221         if( pIncomingPacket->pRemainingData == NULL )\r
222         {\r
223             IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "\r
224                          "%lu for incoming packet type %lu.",\r
225                          pMqttConnection,\r
226                          ( unsigned long ) pIncomingPacket->remainingLength,\r
227                          ( unsigned long ) pIncomingPacket->type );\r
228 \r
229             _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );\r
230 \r
231             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
232         }\r
233         else\r
234         {\r
235             EMPTY_ELSE_MARKER;\r
236         }\r
237 \r
238         dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection,\r
239                                                                      pIncomingPacket->pRemainingData,\r
240                                                                      pIncomingPacket->remainingLength );\r
241 \r
242         if( dataBytesRead != pIncomingPacket->remainingLength )\r
243         {\r
244             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );\r
245         }\r
246         else\r
247         {\r
248             EMPTY_ELSE_MARKER;\r
249         }\r
250     }\r
251     else\r
252     {\r
253         EMPTY_ELSE_MARKER;\r
254     }\r
255 \r
256     /* Clean up on error. */\r
257     IOT_FUNCTION_CLEANUP_BEGIN();\r
258 \r
259     if( status != IOT_MQTT_SUCCESS )\r
260     {\r
261         if( pIncomingPacket->pRemainingData != NULL )\r
262         {\r
263             IotMqtt_FreeMessage( pIncomingPacket->pRemainingData );\r
264         }\r
265         else\r
266         {\r
267             EMPTY_ELSE_MARKER;\r
268         }\r
269     }\r
270     else\r
271     {\r
272         EMPTY_ELSE_MARKER;\r
273     }\r
274 \r
275     IOT_FUNCTION_CLEANUP_END();\r
276 }\r
277 \r
278 /*-----------------------------------------------------------*/\r
279 \r
280 static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,\r
281                                                   _mqttPacket_t * pIncomingPacket )\r
282 {\r
283     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
284     _mqttOperation_t * pOperation = NULL;\r
285 \r
286     /* Deserializer function. */\r
287     IotMqttError_t ( * deserialize )( _mqttPacket_t * ) = NULL;\r
288 \r
289     /* A buffer for remaining data must be allocated if remaining length is not 0. */\r
290     IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) ==\r
291                     ( pIncomingPacket->pRemainingData != NULL ) );\r
292 \r
293     /* Only valid packets should be given to this function. */\r
294     IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true );\r
295 \r
296     /* Mask out the low bits of packet type to ignore flags. */\r
297     switch( ( pIncomingPacket->type & 0xf0 ) )\r
298     {\r
299         case MQTT_PACKET_TYPE_CONNACK:\r
300             IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection );\r
301 \r
302             /* Choose CONNACK deserializer. */\r
303             deserialize = _IotMqtt_DeserializeConnack;\r
304 \r
305             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
306                 if( pMqttConnection->pSerializer != NULL )\r
307                 {\r
308                     if( pMqttConnection->pSerializer->deserialize.connack != NULL )\r
309                     {\r
310                         deserialize = pMqttConnection->pSerializer->deserialize.connack;\r
311                     }\r
312                     else\r
313                     {\r
314                         EMPTY_ELSE_MARKER;\r
315                     }\r
316                 }\r
317                 else\r
318                 {\r
319                     EMPTY_ELSE_MARKER;\r
320                 }\r
321             #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
322 \r
323             /* Deserialize CONNACK and notify of result. */\r
324             status = deserialize( pIncomingPacket );\r
325             pOperation = _IotMqtt_FindOperation( pMqttConnection,\r
326                                                  IOT_MQTT_CONNECT,\r
327                                                  NULL );\r
328 \r
329             if( pOperation != NULL )\r
330             {\r
331                 pOperation->u.operation.status = status;\r
332                 _IotMqtt_Notify( pOperation );\r
333             }\r
334             else\r
335             {\r
336                 EMPTY_ELSE_MARKER;\r
337             }\r
338 \r
339             break;\r
340 \r
341         case MQTT_PACKET_TYPE_PUBLISH:\r
342             IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection );\r
343 \r
344             /* Allocate memory to handle the incoming PUBLISH. */\r
345             pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );\r
346 \r
347             if( pOperation == NULL )\r
348             {\r
349                 IotLogWarn( "Failed to allocate memory for incoming PUBLISH." );\r
350                 status = IOT_MQTT_NO_MEMORY;\r
351 \r
352                 break;\r
353             }\r
354             else\r
355             {\r
356                 /* Set the members of the incoming PUBLISH operation. */\r
357                 ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );\r
358                 pOperation->incomingPublish = true;\r
359                 pOperation->pMqttConnection = pMqttConnection;\r
360                 pIncomingPacket->u.pIncomingPublish = pOperation;\r
361             }\r
362 \r
363             /* Choose a PUBLISH deserializer. */\r
364             deserialize = _IotMqtt_DeserializePublish;\r
365 \r
366             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
367                 if( pMqttConnection->pSerializer != NULL )\r
368                 {\r
369                     if( pMqttConnection->pSerializer->deserialize.publish != NULL )\r
370                     {\r
371                         deserialize = pMqttConnection->pSerializer->deserialize.publish;\r
372                     }\r
373                     else\r
374                     {\r
375                         EMPTY_ELSE_MARKER;\r
376                     }\r
377                 }\r
378                 else\r
379                 {\r
380                     EMPTY_ELSE_MARKER;\r
381                 }\r
382             #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
383 \r
384             /* Deserialize incoming PUBLISH. */\r
385             status = deserialize( pIncomingPacket );\r
386 \r
387             if( status == IOT_MQTT_SUCCESS )\r
388             {\r
389                 /* Send a PUBACK for QoS 1 PUBLISH. */\r
390                 if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 )\r
391                 {\r
392                     _sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier );\r
393                 }\r
394                 else\r
395                 {\r
396                     EMPTY_ELSE_MARKER;\r
397                 }\r
398 \r
399                 /* Transfer ownership of the received MQTT packet to the PUBLISH operation. */\r
400                 pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData;\r
401                 pIncomingPacket->pRemainingData = NULL;\r
402 \r
403                 /* Add the PUBLISH to the list of operations pending processing. */\r
404                 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
405                 IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),\r
406                                           &( pOperation->link ) );\r
407                 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
408 \r
409                 /* Increment the MQTT connection reference count before scheduling an\r
410                  * incoming PUBLISH. */\r
411                 if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true )\r
412                 {\r
413                     /* Schedule PUBLISH for callback invocation. */\r
414                     status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 );\r
415                 }\r
416                 else\r
417                 {\r
418                     status = IOT_MQTT_NETWORK_ERROR;\r
419                 }\r
420             }\r
421             else\r
422             {\r
423                 EMPTY_ELSE_MARKER;\r
424             }\r
425 \r
426             /* Free PUBLISH operation on error. */\r
427             if( status != IOT_MQTT_SUCCESS )\r
428             {\r
429                 /* Check ownership of the received MQTT packet. */\r
430                 if( pOperation->u.publish.pReceivedData != NULL )\r
431                 {\r
432                     /* Retrieve the pointer MQTT packet pointer so it may be freed later. */\r
433                     IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );\r
434                     pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData;\r
435                 }\r
436                 else\r
437                 {\r
438                     EMPTY_ELSE_MARKER;\r
439                 }\r
440 \r
441                 /* Remove operation from pending processing list. */\r
442                 IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
443 \r
444                 if( IotLink_IsLinked( &( pOperation->link ) ) == true )\r
445                 {\r
446                     IotListDouble_Remove( &( pOperation->link ) );\r
447                 }\r
448                 else\r
449                 {\r
450                     EMPTY_ELSE_MARKER;\r
451                 }\r
452 \r
453                 IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
454 \r
455                 IotMqtt_Assert( pOperation != NULL );\r
456                 IotMqtt_FreeOperation( pOperation );\r
457             }\r
458             else\r
459             {\r
460                 EMPTY_ELSE_MARKER;\r
461             }\r
462 \r
463             break;\r
464 \r
465         case MQTT_PACKET_TYPE_PUBACK:\r
466             IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection );\r
467 \r
468             /* Choose PUBACK deserializer. */\r
469             deserialize = _IotMqtt_DeserializePuback;\r
470 \r
471             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
472                 if( pMqttConnection->pSerializer != NULL )\r
473                 {\r
474                     if( pMqttConnection->pSerializer->deserialize.puback != NULL )\r
475                     {\r
476                         deserialize = pMqttConnection->pSerializer->deserialize.puback;\r
477                     }\r
478                     else\r
479                     {\r
480                         EMPTY_ELSE_MARKER;\r
481                     }\r
482                 }\r
483                 else\r
484                 {\r
485                     EMPTY_ELSE_MARKER;\r
486                 }\r
487             #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
488 \r
489             /* Deserialize PUBACK and notify of result. */\r
490             status = deserialize( pIncomingPacket );\r
491             pOperation = _IotMqtt_FindOperation( pMqttConnection,\r
492                                                  IOT_MQTT_PUBLISH_TO_SERVER,\r
493                                                  &( pIncomingPacket->packetIdentifier ) );\r
494 \r
495             if( pOperation != NULL )\r
496             {\r
497                 pOperation->u.operation.status = status;\r
498                 _IotMqtt_Notify( pOperation );\r
499             }\r
500             else\r
501             {\r
502                 EMPTY_ELSE_MARKER;\r
503             }\r
504 \r
505             break;\r
506 \r
507         case MQTT_PACKET_TYPE_SUBACK:\r
508             IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection );\r
509 \r
510             /* Choose SUBACK deserializer. */\r
511             deserialize = _IotMqtt_DeserializeSuback;\r
512 \r
513             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
514                 if( pMqttConnection->pSerializer != NULL )\r
515                 {\r
516                     if( pMqttConnection->pSerializer->deserialize.suback != NULL )\r
517                     {\r
518                         deserialize = pMqttConnection->pSerializer->deserialize.suback;\r
519                     }\r
520                     else\r
521                     {\r
522                         EMPTY_ELSE_MARKER;\r
523                     }\r
524                 }\r
525                 else\r
526                 {\r
527                     EMPTY_ELSE_MARKER;\r
528                 }\r
529             #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
530 \r
531             /* Deserialize SUBACK and notify of result. */\r
532             pIncomingPacket->u.pMqttConnection = pMqttConnection;\r
533             status = deserialize( pIncomingPacket );\r
534             pOperation = _IotMqtt_FindOperation( pMqttConnection,\r
535                                                  IOT_MQTT_SUBSCRIBE,\r
536                                                  &( pIncomingPacket->packetIdentifier ) );\r
537 \r
538             if( pOperation != NULL )\r
539             {\r
540                 pOperation->u.operation.status = status;\r
541                 _IotMqtt_Notify( pOperation );\r
542             }\r
543             else\r
544             {\r
545                 EMPTY_ELSE_MARKER;\r
546             }\r
547 \r
548             break;\r
549 \r
550         case MQTT_PACKET_TYPE_UNSUBACK:\r
551             IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection );\r
552 \r
553             /* Choose UNSUBACK deserializer. */\r
554             deserialize = _IotMqtt_DeserializeUnsuback;\r
555 \r
556             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
557                 if( pMqttConnection->pSerializer != NULL )\r
558                 {\r
559                     if( pMqttConnection->pSerializer->deserialize.unsuback != NULL )\r
560                     {\r
561                         deserialize = pMqttConnection->pSerializer->deserialize.unsuback;\r
562                     }\r
563                     else\r
564                     {\r
565                         EMPTY_ELSE_MARKER;\r
566                     }\r
567                 }\r
568                 else\r
569                 {\r
570                     EMPTY_ELSE_MARKER;\r
571                 }\r
572             #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
573 \r
574             /* Deserialize UNSUBACK and notify of result. */\r
575             status = deserialize( pIncomingPacket );\r
576             pOperation = _IotMqtt_FindOperation( pMqttConnection,\r
577                                                  IOT_MQTT_UNSUBSCRIBE,\r
578                                                  &( pIncomingPacket->packetIdentifier ) );\r
579 \r
580             if( pOperation != NULL )\r
581             {\r
582                 pOperation->u.operation.status = status;\r
583                 _IotMqtt_Notify( pOperation );\r
584             }\r
585             else\r
586             {\r
587                 EMPTY_ELSE_MARKER;\r
588             }\r
589 \r
590             break;\r
591 \r
592         default:\r
593             /* The only remaining valid type is PINGRESP. */\r
594             IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP );\r
595             IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection );\r
596 \r
597             /* Choose PINGRESP deserializer. */\r
598             deserialize = _IotMqtt_DeserializePingresp;\r
599 \r
600             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
601                 if( pMqttConnection->pSerializer != NULL )\r
602                 {\r
603                     if( pMqttConnection->pSerializer->deserialize.pingresp != NULL )\r
604                     {\r
605                         deserialize = pMqttConnection->pSerializer->deserialize.pingresp;\r
606                     }\r
607                     else\r
608                     {\r
609                         EMPTY_ELSE_MARKER;\r
610                     }\r
611                 }\r
612                 else\r
613                 {\r
614                     EMPTY_ELSE_MARKER;\r
615                 }\r
616             #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
617 \r
618             /* Deserialize PINGRESP. */\r
619             status = deserialize( pIncomingPacket );\r
620 \r
621             if( status == IOT_MQTT_SUCCESS )\r
622             {\r
623                 if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),\r
624                                                0,\r
625                                                1 ) == 1 )\r
626                 {\r
627                     IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
628                                  pMqttConnection );\r
629                 }\r
630                 else\r
631                 {\r
632                     IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
633                                 pMqttConnection );\r
634                 }\r
635             }\r
636             else\r
637             {\r
638                 EMPTY_ELSE_MARKER;\r
639             }\r
640 \r
641             break;\r
642     }\r
643 \r
644     if( status != IOT_MQTT_SUCCESS )\r
645     {\r
646         IotLogError( "(MQTT connection %p) Packet parser status %s.",\r
647                      pMqttConnection,\r
648                      IotMqtt_strerror( status ) );\r
649     }\r
650     else\r
651     {\r
652         EMPTY_ELSE_MARKER;\r
653     }\r
654 \r
655     return status;\r
656 }\r
657 \r
658 /*-----------------------------------------------------------*/\r
659 \r
660 static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
661                          uint16_t packetIdentifier )\r
662 {\r
663     IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
664     _mqttOperation_t * pPubackOperation = NULL;\r
665 \r
666     /* Default PUBACK serializer function. */\r
667     IotMqttError_t ( * serializePuback )( uint16_t,\r
668                                           uint8_t **,\r
669                                           size_t * ) = _IotMqtt_SerializePuback;\r
670 \r
671     IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",\r
672                  pMqttConnection,\r
673                  packetIdentifier );\r
674 \r
675     /* Choose PUBACK serializer and free packet functions. */\r
676     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
677         if( pMqttConnection->pSerializer != NULL )\r
678         {\r
679             if( pMqttConnection->pSerializer->serialize.puback != NULL )\r
680             {\r
681                 serializePuback = pMqttConnection->pSerializer->serialize.puback;\r
682             }\r
683             else\r
684             {\r
685                 EMPTY_ELSE_MARKER;\r
686             }\r
687         }\r
688         else\r
689         {\r
690             EMPTY_ELSE_MARKER;\r
691         }\r
692     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
693 \r
694     /* Create a PUBACK operation. */\r
695     status = _IotMqtt_CreateOperation( pMqttConnection,\r
696                                        0,\r
697                                        NULL,\r
698                                        &pPubackOperation );\r
699 \r
700     if( status != IOT_MQTT_SUCCESS )\r
701     {\r
702         IOT_GOTO_CLEANUP();\r
703     }\r
704 \r
705     /* Set the operation type. */\r
706     pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;\r
707 \r
708     /* Generate a PUBACK packet from the packet identifier. */\r
709     status = serializePuback( packetIdentifier,\r
710                               &( pPubackOperation->u.operation.pMqttPacket ),\r
711                               &( pPubackOperation->u.operation.packetSize ) );\r
712 \r
713     if( status != IOT_MQTT_SUCCESS )\r
714     {\r
715         IOT_GOTO_CLEANUP();\r
716     }\r
717 \r
718     /* Add the PUBACK operation to the send queue for network transmission. */\r
719     status = _IotMqtt_ScheduleOperation( pPubackOperation,\r
720                                          _IotMqtt_ProcessSend,\r
721                                          0 );\r
722 \r
723     if( status != IOT_MQTT_SUCCESS )\r
724     {\r
725         IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",\r
726                      pMqttConnection );\r
727 \r
728         IOT_GOTO_CLEANUP();\r
729     }\r
730     else\r
731     {\r
732         EMPTY_ELSE_MARKER;\r
733     }\r
734 \r
735     /* Clean up on error. */\r
736     IOT_FUNCTION_CLEANUP_BEGIN();\r
737 \r
738     if( status != IOT_MQTT_SUCCESS )\r
739     {\r
740         if( pPubackOperation != NULL )\r
741         {\r
742             _IotMqtt_DestroyOperation( pPubackOperation );\r
743         }\r
744         else\r
745         {\r
746             EMPTY_ELSE_MARKER;\r
747         }\r
748     }\r
749     else\r
750     {\r
751         EMPTY_ELSE_MARKER;\r
752     }\r
753 }\r
754 \r
755 /*-----------------------------------------------------------*/\r
756 \r
757 static void _flushPacket( void * pNetworkConnection,\r
758                           const _mqttConnection_t * pMqttConnection,\r
759                           size_t length )\r
760 {\r
761     size_t bytesFlushed = 0;\r
762     uint8_t receivedByte = 0;\r
763 \r
764     for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )\r
765     {\r
766         ( void ) _IotMqtt_GetNextByte( pNetworkConnection,\r
767                                        pMqttConnection->pNetworkInterface,\r
768                                        &receivedByte );\r
769     }\r
770 }\r
771 \r
772 /*-----------------------------------------------------------*/\r
773 \r
774 bool _IotMqtt_GetNextByte( void * pNetworkConnection,\r
775                            const IotNetworkInterface_t * pNetworkInterface,\r
776                            uint8_t * pIncomingByte )\r
777 {\r
778     bool status = false;\r
779     uint8_t incomingByte = 0;\r
780     size_t bytesReceived = 0;\r
781 \r
782     /* Attempt to read 1 byte. */\r
783     bytesReceived = pNetworkInterface->receive( pNetworkConnection,\r
784                                                 &incomingByte,\r
785                                                 1 );\r
786 \r
787     /* Set the output parameter and return success if 1 byte was read. */\r
788     if( bytesReceived == 1 )\r
789     {\r
790         *pIncomingByte = incomingByte;\r
791         status = true;\r
792     }\r
793     else\r
794     {\r
795         /* Network receive must return 0 on failure. */\r
796         IotMqtt_Assert( bytesReceived == 0 );\r
797     }\r
798 \r
799     return status;\r
800 }\r
801 \r
802 /*-----------------------------------------------------------*/\r
803 \r
804 void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,\r
805                                       _mqttConnection_t * pMqttConnection )\r
806 {\r
807     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
808     IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;\r
809     IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };\r
810     void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;\r
811 \r
812     /* Disconnect callback function. */\r
813     void ( * disconnectCallback )( void *,\r
814                                    IotMqttCallbackParam_t * ) = NULL;\r
815 \r
816     /* Network close function. */\r
817     IotNetworkError_t ( * closeConnection) ( void * ) = NULL;\r
818 \r
819     /* Default free packet function. */\r
820     void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
821 \r
822     /* Mark the MQTT connection as disconnected and the keep-alive as failed. */\r
823     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
824     pMqttConnection->disconnected = true;\r
825 \r
826     if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
827     {\r
828         /* Keep-alive must have a PINGREQ allocated. */\r
829         IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );\r
830         IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );\r
831 \r
832         /* PINGREQ provides a reference to the connection, so reference count must\r
833          * be nonzero. */\r
834         IotMqtt_Assert( pMqttConnection->references > 0 );\r
835 \r
836         /* Attempt to cancel the keep-alive job. */\r
837         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
838                                                 pMqttConnection->pingreq.job,\r
839                                                 NULL );\r
840 \r
841         /* If the keep-alive job was not canceled, it must be already executing.\r
842          * Any other return value is invalid. */\r
843         IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||\r
844                         ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );\r
845 \r
846         /* Clean up keep-alive if its job was successfully canceled. Otherwise,\r
847          * the executing keep-alive job will clean up itself. */\r
848         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
849         {\r
850             /* Choose a function to free the packet. */\r
851             #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
852                 if( pMqttConnection->pSerializer != NULL )\r
853                 {\r
854                     if( pMqttConnection->pSerializer->freePacket != NULL )\r
855                     {\r
856                         freePacket = pMqttConnection->pSerializer->freePacket;\r
857                     }\r
858                 }\r
859             #endif\r
860 \r
861             freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
862 \r
863             /* Clear data about the keep-alive. */\r
864             pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
865             pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
866             pMqttConnection->pingreq.u.operation.packetSize = 0;\r
867 \r
868             /* Keep-alive is cleaned up; decrement reference count. Since this\r
869              * function must be followed with a call to DISCONNECT, a check to\r
870              * destroy the connection is not done here. */\r
871             pMqttConnection->references--;\r
872 \r
873             IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.",\r
874                          pMqttConnection );\r
875         }\r
876         else\r
877         {\r
878             EMPTY_ELSE_MARKER;\r
879         }\r
880     }\r
881     else\r
882     {\r
883         EMPTY_ELSE_MARKER;\r
884     }\r
885 \r
886     /* Copy the function pointers and contexts, as the MQTT connection may be\r
887      * modified after the mutex is released. */\r
888     disconnectCallback = pMqttConnection->disconnectCallback.function;\r
889     pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;\r
890 \r
891     closeConnection = pMqttConnection->pNetworkInterface->close;\r
892     pNetworkConnection = pMqttConnection->pNetworkConnection;\r
893 \r
894     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
895 \r
896     /* Close the network connection. */\r
897     if( closeConnection != NULL )\r
898     {\r
899         closeStatus = closeConnection( pNetworkConnection );\r
900 \r
901         if( closeStatus == IOT_NETWORK_SUCCESS )\r
902         {\r
903             IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection );\r
904         }\r
905         else\r
906         {\r
907             IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.",\r
908                         pMqttConnection,\r
909                         closeStatus );\r
910         }\r
911     }\r
912     else\r
913     {\r
914         IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection"\r
915                     " not closed.", pMqttConnection );\r
916     }\r
917 \r
918     /* Invoke the disconnect callback. */\r
919     if( disconnectCallback != NULL )\r
920     {\r
921         /* Set the members of the callback parameter. */\r
922         callbackParam.mqttConnection = pMqttConnection;\r
923         callbackParam.u.disconnectReason = disconnectReason;\r
924 \r
925         disconnectCallback( pDisconnectCallbackContext,\r
926                             &callbackParam );\r
927     }\r
928     else\r
929     {\r
930         EMPTY_ELSE_MARKER;\r
931     }\r
932 }\r
933 \r
934 /*-----------------------------------------------------------*/\r
935 \r
936 void IotMqtt_ReceiveCallback( void * pNetworkConnection,\r
937                               void * pReceiveContext )\r
938 {\r
939     IotMqttError_t status = IOT_MQTT_SUCCESS;\r
940     _mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL };\r
941 \r
942     /* Cast context to correct type. */\r
943     _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext;\r
944 \r
945     /* Read an MQTT packet from the network. */\r
946     status = _getIncomingPacket( pNetworkConnection,\r
947                                  pMqttConnection,\r
948                                  &incomingPacket );\r
949 \r
950     if( status == IOT_MQTT_SUCCESS )\r
951     {\r
952         /* Deserialize the received packet. */\r
953         status = _deserializeIncomingPacket( pMqttConnection,\r
954                                              &incomingPacket );\r
955 \r
956         /* Free any buffers allocated for the MQTT packet. */\r
957         if( incomingPacket.pRemainingData != NULL )\r
958         {\r
959             IotMqtt_FreeMessage( incomingPacket.pRemainingData );\r
960         }\r
961         else\r
962         {\r
963             EMPTY_ELSE_MARKER;\r
964         }\r
965     }\r
966     else\r
967     {\r
968         EMPTY_ELSE_MARKER;\r
969     }\r
970 \r
971     /* Close the network connection on a bad response. */\r
972     if( status == IOT_MQTT_BAD_RESPONSE )\r
973     {\r
974         IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.",\r
975                      pMqttConnection );\r
976 \r
977         _IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED,\r
978                                          pMqttConnection );\r
979     }\r
980     else\r
981     {\r
982         EMPTY_ELSE_MARKER;\r
983     }\r
984 }\r
985 \r
986 /*-----------------------------------------------------------*/\r