]> git.sur5r.net Git - freertos/blob - FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c
Update version number in readiness for V10.3.0 release. Sync SVN with reviewed releas...
[freertos] / FreeRTOS-Labs / Demo / FreeRTOS_IoT_Libraries / mqtt / common / DemoTasks / LightWeightMQTTExample.c
1 /*\r
2  * FreeRTOS Kernel V10.3.0\r
3  * Copyright (C) 2020 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://www.FreeRTOS.org\r
23  * http://aws.amazon.com/freertos\r
24  *\r
25  * 1 tab == 4 spaces!\r
26  */\r
27 \r
28 /*\r
29  * Proof of Concept for use of MQTT light weight serializer API.\r
30  * Light weight serializer API lets user to serialize and\r
31  * deserialize MQTT messages into user provided buffer.\r
32  * This API allows use of statically allocated buffer.\r
33  *\r
34  * Example shown below uses this API to create MQTT messages and\r
35  * and send them over connection established using FreeRTOS sockets.\r
36  * The example is single threaded and uses statically allocated memory.\r
37  *\r
38  * !!! NOTE !!!\r
39  * This is work in progress to show how light weight serializer\r
40  * API can be used. This is not a complete demo, and should not\r
41  * be treated as production ready code.\r
42  */\r
43 \r
44 /* Standard includes. */\r
45 #include <string.h>\r
46 #include <stdio.h>\r
47 \r
48 /* Kernel includes. */\r
49 #include "FreeRTOS.h"\r
50 #include "task.h"\r
51 \r
52 /* FreeRTOS+TCP includes. */\r
53 #include "FreeRTOS_IP.h"\r
54 #include "FreeRTOS_Sockets.h"\r
55 \r
56 /* IoT SDK includes. */\r
57 #include "iot_mqtt.h"\r
58 #include "iot_mqtt_serialize.h"\r
59 #include "platform/iot_network_freertos.h"\r
60 \r
61 /* Demo Specific configs. */\r
62 #include "mqtt_demo_profile.h"\r
63 \r
64 \r
65 /**\r
66  * @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask().\r
67  */\r
68 #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS        ( pdMS_TO_TICKS( 5000 ) )\r
69 \r
70 /**\r
71  * @brief Time to wait before sending ping request to keep MQTT connection alive.\r
72  */\r
73 #define mqttexampleKEEP_ALIVE_DELAY                                     ( pdMS_TO_TICKS( 1000 ) )\r
74 \r
75 \r
76 /**\r
77  * @brief The MQTT client identifier used in this example.  Each client identifier\r
78  * must be unique so edit as required to ensure no two clients connecting to the\r
79  * same broker use the same client identifier.\r
80  */\r
81 #define mqttexampleCLIENT_IDENTIFIER       mqttdemoprofileCLIENT_IDENTIFIER\r
82 \r
83 /**\r
84  * @brief Details of the MQTT broker to connect to.\r
85  */\r
86 #define mqttexampleMQTT_BROKER_ENDPOINT    mqttdemoprofileBROKER_ENDPOINT\r
87 \r
88 /**\r
89  * @brief The port to use for the demo.\r
90  */\r
91 #define mqttexampleMQTT_BROKER_PORT                mqttdemoprofileBROKER_PORT\r
92 \r
93 /**\r
94  * @brief The topic to subscribe and publish to in the example.\r
95  *\r
96  * The topic starts with the client identifier to ensure that each demo interacts\r
97  * with a unique topic.\r
98  */\r
99 #define mqttexampleTOPIC                                   mqttexampleCLIENT_IDENTIFIER "/example/topic"\r
100 \r
101 /**\r
102  * @brief The MQTT message published in this example.\r
103  */\r
104 #define mqttexampleMESSAGE                                 "Hello Light Weight MQTT World!"\r
105 \r
106 /**\r
107  * @brief Dimensions a file scope buffer currently used to send and receive MQTT data from a\r
108  * socket\r
109  */\r
110 #define mqttexampleSHARED_BUFFER_SIZE      500\r
111 \r
112 /*-----------------------------------------------------------*/\r
113 \r
114 /**\r
115  * @brief MQTT Protocol constants used by this demo.\r
116  * These types are defined in internal MQTT include files.\r
117  * For light-weight demo application, only a few are needed, therefore\r
118  * they are redefined here so that internal files need not be included.\r
119  */\r
120 \r
121 /*  MQTT Control Packet Types.*/\r
122 #define MQTT_PACKET_TYPE_CONNACK           ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */\r
123 #define MQTT_PACKET_TYPE_PUBLISH           ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bi-directional). */\r
124 #define MQTT_PACKET_TYPE_SUBACK            ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */\r
125 #define MQTT_PACKET_TYPE_UNSUBACK          ( ( uint8_t ) 0xb0U ) /**< @brief UNSUBACK (server-to-client). */\r
126 #define MQTT_PACKET_TYPE_PINGRESP          ( ( uint8_t ) 0xd0U ) /**< @brief PINGRESP (server-to-client). */\r
127 /* MQTT Fixed Packet Sizes */\r
128 #define MQTT_PACKET_DISCONNECT_SIZE        ( ( uint8_t ) 2 )     /**< @brief Size of DISCONNECT packet. */\r
129 #define MQTT_PACKET_PINGREQ_SIZE           ( ( uint8_t ) 2 )     /**< @brief Size of PINGREQ packet. */\r
130 \r
131 /*-----------------------------------------------------------*/\r
132 \r
133 /**\r
134  * @brief The task used to demonstrate the MQTT API.\r
135  *\r
136  * @param[in] pvParameters Parameters as passed at the time of task creation. Not\r
137  * used in this example.\r
138  */\r
139 static void prvMQTTDemoTask( void * pvParameters );\r
140 \r
141 /**\r
142  * @brief Creates a TCP connection to the MQTT broker as specified in\r
143  * mqttexampleMQTT_BROKER_ENDPOINT and mqttexampleMQTT_BROKER_PORT.\r
144  *\r
145  * @return On success the socket connected to the MQTT broker is returned.  Otherwise\r
146  * FREERTOS_INVALID_SOCKET is returned.\r
147  *\r
148  */\r
149 static Socket_t prvCreateTCPConnectionToBroker( void );\r
150 \r
151 /**\r
152  * @brief Sends an MQTT Connect packet over the already connected TCP socket.\r
153  *\r
154  * @param xMQTTSocketis a TCP socket that is connected to an MQTT broker to which\r
155  * an MQTT connection has been established.\r
156  *\r
157  * @return IOT_MQTT_SUCCESS is returned if the reply is a valid connection\r
158  * acknowledgeable (CONNACK) packet, otherwise an error code is returned.\r
159  */\r
160 static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket );\r
161 \r
162 /**\r
163  * @brief Performs a graceful shutdown and close of the socket passed in as its\r
164  * parameter.\r
165  *\r
166  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
167  * an MQTT connection has been established.\r
168  */\r
169 static void prvGracefulShutDown( Socket_t xSocket );\r
170 \r
171 /**\r
172  * @brief Subscribes to the topic as specified in mqttexampleTOPIC.\r
173  *\r
174  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
175  * an MQTT connection has been established.\r
176  *\r
177  * @return IOT_MQTT_SUCCESS is returned if the\r
178  * subscription is successful, otherwise an error code is returned.\r
179  */\r
180 static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket );\r
181 \r
182 /**\r
183  * @brief  Publishes a messages mqttexampleMESSAGE on mqttexampleTOPIC topic.\r
184  *\r
185  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
186  * an MQTT connection has been established.\r
187  *\r
188  * @return IOT_MQTT_SUCCESS is returned if Publish is successful,\r
189  * otherwise an error code is returned.\r
190  */\r
191 static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket );\r
192 \r
193 /**\r
194  * @brief  Process Incoming Publish.\r
195  *\r
196  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
197  * an MQTT connection has been established.\r
198  *\r
199  * @return #IOT_MQTT_SUCCESS is returned if the processing is successful,\r
200  * otherwise an error code is returned.\r
201  */\r
202 static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket );\r
203 \r
204 /**\r
205  * @brief Unsubscribes from the previously subscribed topic as specified\r
206  * in mqttexampleTOPIC.\r
207  *\r
208  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
209  * an MQTT connection has been established.\r
210  *\r
211  * @return IOT_MQTT_SUCCESS is returned if the\r
212  * unsubscribe is successful, otherwise an error code is returned.\r
213  */\r
214 static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket );\r
215 \r
216 /**\r
217  * @brief Send MQTT Ping Request to broker and receive response.\r
218  * Ping request is used to keep connection to broker alive.\r
219  *\r
220  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
221  * an MQTT connection has been established.\r
222  *\r
223  * @return IOT_MQTT_SUCCESS is returned if the successful Ping Response is received.\r
224  * otherwise an error code is returned.\r
225  */\r
226 static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket );\r
227 \r
228 /**\r
229  * @brief Disconnect From MQTT Broker.\r
230  *\r
231  * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which\r
232  * an MQTT connection has been established.\r
233  *\r
234  * @return IOT_MQTT_SUCCESS is returned if the disconnect is successful,\r
235  * otherwise an error code is returned.\r
236  */\r
237 static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket );\r
238 \r
239 \r
240 /*-----------------------------------------------------------*/\r
241 \r
242 /**\r
243  * @brief Function to receive next byte from network,\r
244  * The declaration must match IotMqttGetNextByte_t.\r
245  *\r
246  * @param[in] pvContext Network Connection context. Implementation in this\r
247  * file uses FreeRTOS socket.\r
248  * @param[in, out] pNextBye Pointer to buffer where the byte will be stored.\r
249  *\r
250  * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_TIMEOUT\r
251  */\r
252 \r
253 IotMqttError_t getNextByte( void * pvContext,\r
254                                                         uint8_t * pNextByte );\r
255 \r
256 /*-----------------------------------------------------------*/\r
257 \r
258 /* @brief Static memory buffer used for sending and receiving MQTT messages */\r
259 static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ];\r
260 \r
261 /*-----------------------------------------------------------*/\r
262 \r
263 /*\r
264  * @brief Task for Light Weight MQTT Serializer API Proof of Concept.\r
265  * To run the proof of concept example, in main.c, in function vApplicationIPNetworkEventHook(),\r
266  * replace vStartSimpleMQTTDemo() with vApplicationIPNetworkEventHook().\r
267  */\r
268 void vStartLightWeightMQTTDemo( void )\r
269 {\r
270 TickType_t xShortDelay = ( TickType_t ) pdMS_TO_TICKS( ( TickType_t ) 500 );\r
271 \r
272         /* Wait a short time to allow receipt of the ARP replies. */\r
273         vTaskDelay( xShortDelay );\r
274 \r
275         /* This example uses a single application task, which in turn is used to\r
276          * connect, subscribe, publish, unsubscribe and disconnect from the MQTT\r
277          * broker. */\r
278         xTaskCreate( prvMQTTDemoTask,          /* Function that implements the task. */\r
279                                  "MQTTLWDemo",             /* Text name for the task - only used for debugging. */\r
280                                  democonfigDEMO_STACKSIZE, /* Size of stack (in words, not bytes) to allocate for the task. */\r
281                                  NULL,                     /* Task parameter - not used in this case. */\r
282                                  tskIDLE_PRIORITY,         /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */\r
283                                  NULL );                   /* Used to pass out a handle to the created task - not used in this case. */\r
284 }\r
285 /*-----------------------------------------------------------*/\r
286 \r
287 static void prvGracefulShutDown( Socket_t xSocket )\r
288 {\r
289 uint8_t ucDummy[ 20 ];\r
290 const TickType_t xShortDelay = pdMS_TO_MIN_TICKS( 250 );\r
291 \r
292         if( xSocket != ( Socket_t ) 0 )\r
293         {\r
294                 if( xSocket != FREERTOS_INVALID_SOCKET )\r
295                 {\r
296                         /* Initiate graceful shutdown. */\r
297                         FreeRTOS_shutdown( xSocket, FREERTOS_SHUT_RDWR );\r
298 \r
299                         /* Wait for the socket to disconnect gracefully (indicated by FreeRTOS_recv()\r
300                          * returning a FREERTOS_EINVAL error) before closing the socket. */\r
301                         while( FreeRTOS_recv( xSocket, ucDummy, sizeof( ucDummy ), 0 ) >= 0 )\r
302                         {\r
303                                 /* Wait for shutdown to complete.  If a receive block time is used then\r
304                                  * this delay will not be necessary as FreeRTOS_recv() will place the RTOS task\r
305                                  * into the Blocked state anyway. */\r
306                                 vTaskDelay( xShortDelay );\r
307 \r
308                                 /* Note ? real applications should implement a timeout here, not just\r
309                                  * loop forever. */\r
310                         }\r
311 \r
312                         /* The socket has shut down and is safe to close. */\r
313                         FreeRTOS_closesocket( xSocket );\r
314                 }\r
315         }\r
316 }\r
317 /*-----------------------------------------------------------*/\r
318 \r
319 IotMqttError_t getNextByte( void * pvContext,\r
320                                                         uint8_t * pNextByte )\r
321 {\r
322 Socket_t xMQTTSocket = ( Socket_t ) pvContext;\r
323 BaseType_t receivedBytes;\r
324 IotMqttError_t result;\r
325 \r
326         /* Receive one byte from network */\r
327         receivedBytes = FreeRTOS_recv( xMQTTSocket, ( void * ) pNextByte, sizeof( uint8_t ), 0 );\r
328 \r
329         if( receivedBytes == sizeof( uint8_t ) )\r
330         {\r
331                 result = IOT_MQTT_SUCCESS;\r
332         }\r
333         else\r
334         {\r
335                 result = IOT_MQTT_TIMEOUT;\r
336         }\r
337 \r
338         return result;\r
339 }\r
340 \r
341 /*-----------------------------------------------------------*/\r
342 \r
343 static void prvMQTTDemoTask( void * pvParameters )\r
344 {\r
345 const TickType_t xNoDelay = ( TickType_t ) 0;\r
346 Socket_t xMQTTSocket;\r
347 IotMqttError_t xReturned;\r
348 uint32_t ulPublishCount = 0;\r
349 const uint32_t ulMaxPublishCount = 5UL;\r
350 \r
351         /* Remove compiler warnings about unused parameters. */\r
352         ( void ) pvParameters;\r
353 \r
354         for( ; ; )\r
355         {\r
356                 /* Don't expect any notifications to be pending yet. */\r
357                 configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 );\r
358 \r
359                 /****************************** Connect. ******************************/\r
360 \r
361                 /* Establish a TCP connection with the MQTT broker. This example connects to\r
362                  * the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT and\r
363                  * mqttexampleMQTT_BROKER_PORT at the top of this file. */\r
364                 configPRINTF( ( "Create a TCP connection to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );\r
365                 xMQTTSocket = prvCreateTCPConnectionToBroker();\r
366                 configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET );\r
367                 configPRINTF( ( "Connected to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );\r
368 \r
369                 /* Sends an MQTT Connect packet over the already connected TCP socket\r
370                  * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is\r
371                  * returned if the reply is a valid connection acknowledgeable (CONNACK) packet,\r
372                  * otherwise an error code is returned. */\r
373                 configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );\r
374                 xReturned = prvCreateMQTTConnectionWithBroker( xMQTTSocket );\r
375                 configASSERT( xReturned == IOT_MQTT_SUCCESS );\r
376                 configPRINTF( ( "Established an MQTT connection.\r\n" ) );\r
377 \r
378                 /**************************** Subscribe. ******************************/\r
379 \r
380                 /* The client is now connected to the broker. Subscribe to the topic\r
381                  * as specified in mqttexampleTOPIC at the top of this file by sending a\r
382                  * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).\r
383                  * This client will then publish to the same topic it subscribed to, so will\r
384                  * expect all the messages it sends to the broker to be sent back to it\r
385                  * from the broker. */\r
386                 configPRINTF( ( "Attempt to subscribed to the MQTT topic %s\r\n", mqttexampleTOPIC ) );\r
387                 xReturned = prvMQTTSubscribeToTopic( xMQTTSocket );\r
388                 configPRINTF( ( "Subscribed to the topic %s\r\n", mqttexampleTOPIC ) );\r
389 \r
390                 /**************************** Publish. ******************************/\r
391                 /* Send publish for with QOS0, Process Keep alive */\r
392                 for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )\r
393                 {\r
394                         configPRINTF( ( "Attempt to publish to the MQTT topic %s\r\n", mqttexampleTOPIC ) );\r
395                         xReturned = prvMQTTPublishToTopic( xMQTTSocket );\r
396                         configASSERT( xReturned == IOT_MQTT_SUCCESS );\r
397                         configPRINTF( ( "Publish successful to the topic %s\r\n", mqttexampleTOPIC ) );\r
398 \r
399                         /* Process incoming publish echo, since application subscribed to the same topic\r
400                          * broker will send publish message back to the application */\r
401                         configPRINTF( ( "Attempt to receive publish message from broker\r\n" ) );\r
402                         xReturned = prvMQTTProcessIncomingPublish( xMQTTSocket );\r
403                         configASSERT( xReturned == IOT_MQTT_SUCCESS );\r
404                         configPRINTF( ( "Successfully Received Publish message from broker\r\n" ) );\r
405 \r
406                         /* Leave Connection Idle for some time */\r
407                         configPRINTF( ( "Keeping Connection Idle\r\n" ) );\r
408                         vTaskDelay( pdMS_TO_TICKS( mqttexampleKEEP_ALIVE_DELAY ) );\r
409                         /* Send Ping request to broker and receive ping response */\r
410                         configPRINTF( ( "Sending Ping Request to the broker\r\n" ) );\r
411                         xReturned = prvMQTTKeepAlive( xMQTTSocket );\r
412                         configASSERT( xReturned == IOT_MQTT_SUCCESS );\r
413                         configPRINTF( ( "Ping Response successfully received\r\n" ) );\r
414                 }\r
415 \r
416                 /************************ Unsubscribe from the topic. **************************/\r
417                 configPRINTF( ( "Attempt to unsubscribe from the MQTT topic %s\r\n", mqttexampleTOPIC ) );\r
418                 xReturned = prvMQTTUnsubscribeFromTopic( xMQTTSocket );\r
419                 configASSERT( xReturned == IOT_MQTT_SUCCESS );\r
420                 configPRINTF( ( "Unsubscribe from the topic %s\r\n", mqttexampleTOPIC ) );\r
421 \r
422                 /**************************** Disconnect. ******************************/\r
423 \r
424                 /* Sends an MQTT Disconnect packet over the already connected TCP socket\r
425                  * xMQTTSocket, then waits for and interprets the reply.  IOT_MQTT_SUCCESS is\r
426                  * returned if the reply is a valid connection acknowledgeable (CONNACK) packet,\r
427                  * otherwise an error code is returned. */\r
428 \r
429                 configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );\r
430                 xReturned = prvMQTTDisconnect( xMQTTSocket );\r
431                 configASSERT( xReturned == IOT_MQTT_SUCCESS );\r
432                 configPRINTF( ( "Established an MQTT connection.\r\n" ) );\r
433                 /* Disconnect from broker. */\r
434                 prvGracefulShutDown( xMQTTSocket );\r
435 \r
436                 /* Wait for some time between two iterations to ensure that we do not\r
437                  * bombard the public test mosquitto broker. */\r
438                 configPRINTF( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u\r\n", xPortGetFreeHeapSize() ) );\r
439                 configPRINTF( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );\r
440                 vTaskDelay( pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ) );\r
441         }\r
442 }\r
443 /*-----------------------------------------------------------*/\r
444 \r
445 Socket_t prvCreateTCPConnectionToBroker( void )\r
446 {\r
447 Socket_t xMQTTSocket;\r
448         struct freertos_sockaddr xBrokerAddress;\r
449         uint32_t ulBrokerIPAddress;\r
450 \r
451         /* This is the socket used to connect to the MQTT broker. */\r
452         xMQTTSocket = FreeRTOS_socket( FREERTOS_AF_INET,\r
453                                                                    FREERTOS_SOCK_STREAM,\r
454                                                                    FREERTOS_IPPROTO_TCP );\r
455 \r
456         configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET );\r
457 \r
458         /* Locate then connect to the MQTT broker. */\r
459         ulBrokerIPAddress = FreeRTOS_gethostbyname( mqttexampleMQTT_BROKER_ENDPOINT );\r
460 \r
461         if( ulBrokerIPAddress != 0 )\r
462         {\r
463                 xBrokerAddress.sin_port = FreeRTOS_htons( mqttexampleMQTT_BROKER_PORT );\r
464                 xBrokerAddress.sin_addr = ulBrokerIPAddress;\r
465 \r
466                 if( FreeRTOS_connect( xMQTTSocket, &xBrokerAddress, sizeof( xBrokerAddress ) ) != 0 )\r
467                 {\r
468                         /* Could not connect so delete socket and return an error. */\r
469                         FreeRTOS_closesocket( xMQTTSocket );\r
470                         xMQTTSocket = FREERTOS_INVALID_SOCKET;\r
471                 }\r
472         }\r
473 \r
474         return xMQTTSocket;\r
475 }\r
476 /*-----------------------------------------------------------*/\r
477 \r
478 static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket )\r
479 {\r
480 IotMqttConnectInfo_t xConnectInfo;\r
481 size_t xRemainingLength = 0;\r
482 size_t xPacketSize = 0;\r
483 IotMqttError_t xResult;\r
484 IotMqttPacketInfo_t xIncomingPacket;\r
485 \r
486         /* Many fields not used in this demo so start with everything at 0. */\r
487         memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );\r
488         memset( ( void * ) &xIncomingPacket, 0x00, sizeof( xIncomingPacket ) );\r
489 \r
490         /* Start with a clean session i.e. direct the MQTT broker to discard any\r
491          * previous session data. Also, establishing a connection with clean session\r
492          * will ensure that the broker does not store any data when this client\r
493          * gets disconnected. */\r
494         xConnectInfo.cleanSession = true;\r
495 \r
496         /* The client identifier is used to uniquely identify this MQTT client to\r
497          * the MQTT broker.  In a production device the identifier can be something\r
498          * unique, such as a device serial number. */\r
499         xConnectInfo.pClientIdentifier = mqttexampleCLIENT_IDENTIFIER;\r
500         xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( mqttexampleCLIENT_IDENTIFIER );\r
501 \r
502         /* Get size requirement for the connect packet */\r
503         xResult = IotMqtt_GetConnectPacketSize( &xConnectInfo, &xRemainingLength, &xPacketSize );\r
504         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
505         /* Make sure the packet size is less than static buffer size */\r
506         configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );\r
507         /* Serialize MQTT connect packet into provided buffer */\r
508         xResult = IotMqtt_SerializeConnect( &xConnectInfo, xRemainingLength, ucSharedBuffer, xPacketSize );\r
509         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
510 \r
511         if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )\r
512         {\r
513                 /* Wait for the connection ack. TODO check the receive timeout value. */\r
514 \r
515                 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );\r
516 \r
517                 /* Get packet type and remaining length of the received packet\r
518                  * We cannot assume received data is the connection acknowledgment.\r
519                  * Therefore this function reads type and remaining length of the\r
520                  * received packet, before processing entire packet.\r
521                  */\r
522                 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );\r
523                 configASSERT( xResult == IOT_MQTT_SUCCESS );\r
524                 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_CONNACK );\r
525                 configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );\r
526 \r
527                 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 )\r
528                         == ( BaseType_t ) xIncomingPacket.remainingLength )\r
529                 {\r
530                         xIncomingPacket.pRemainingData = ucSharedBuffer;\r
531 \r
532                         if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )\r
533                         {\r
534                                 xResult = IOT_MQTT_SERVER_REFUSED;\r
535                         }\r
536                 }\r
537                 else\r
538                 {\r
539                         configPRINTF( ( "Receive Failed while receiving MQTT ConnAck\n" ) );\r
540                         xResult = IOT_MQTT_NETWORK_ERROR;\r
541                 }\r
542         }\r
543         else\r
544         {\r
545                 configPRINTF( ( "Send Failed while connecting to MQTT broker\n" ) );\r
546                 xResult = IOT_MQTT_NETWORK_ERROR;\r
547         }\r
548 \r
549         return xResult;\r
550 }\r
551 /*-----------------------------------------------------------*/\r
552 \r
553 static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket )\r
554 {\r
555 IotMqttError_t xResult;\r
556 IotMqttSubscription_t xMQTTSubscription[ 1 ];\r
557 size_t xRemainingLength = 0;\r
558 size_t xPacketSize = 0;\r
559 uint16_t usPacketIdentifier;\r
560 IotMqttPacketInfo_t xIncomingPacket;\r
561 \r
562         /* Some fields not used by this demo so start with everything at 0. */\r
563         memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );\r
564 \r
565         /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to only one topic */\r
566         xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0;\r
567         xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;\r
568         xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );\r
569 \r
570         xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_SUBSCRIBE,\r
571                                                                                                  xMQTTSubscription,\r
572                                                                                                  sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),\r
573                                                                                                  &xRemainingLength, &xPacketSize );\r
574         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
575         /* Make sure the packet size is less than static buffer size */\r
576         configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );\r
577 \r
578         /* Serialize subscribe into statically allocated ucSharedBuffer */\r
579         xResult = IotMqtt_SerializeSubscribe( xMQTTSubscription,\r
580                                                                                   sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),\r
581                                                                                   xRemainingLength,\r
582                                                                                   &usPacketIdentifier,\r
583                                                                                   ucSharedBuffer,\r
584                                                                                   xPacketSize );\r
585 \r
586         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
587 \r
588         if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )\r
589         {\r
590                 /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so\r
591                  * publishes to this client can occur at any time and we cannot assume received\r
592                  * data is the subscription acknowledgment.  Therefore this function is, at this\r
593                  * time, doing what would otherwise be done wherever incoming packets are\r
594                  * interpreted (in a callback, or whatever). */\r
595                 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );\r
596                 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );\r
597                 configASSERT( xResult == IOT_MQTT_SUCCESS );\r
598                 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_SUBACK );\r
599                 configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );\r
600 \r
601                 /* Receive the remaining bytes. */\r
602                 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )\r
603                 {\r
604                         xIncomingPacket.pRemainingData = ucSharedBuffer;\r
605 \r
606                         if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )\r
607                         {\r
608                                 xResult = IOT_MQTT_BAD_RESPONSE;\r
609                         }\r
610                 }\r
611                 else\r
612                 {\r
613                         xResult = IOT_MQTT_NETWORK_ERROR;\r
614                 }\r
615         }\r
616         else\r
617         {\r
618                 xResult = IOT_MQTT_NETWORK_ERROR;\r
619         }\r
620 \r
621         return xResult;\r
622 }\r
623 /*-----------------------------------------------------------*/\r
624 \r
625 static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket )\r
626 {\r
627 IotMqttError_t xResult;\r
628 IotMqttPublishInfo_t xMQTTPublishInfo;\r
629 size_t xRemainingLength = 0;\r
630 size_t xPacketSize = 0;\r
631 uint16_t usPacketIdentifier;\r
632 uint8_t * pusPacketIdentifierHigh;\r
633 \r
634         /* Some fields not used by this demo so start with everything at 0. */\r
635         memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );\r
636         xMQTTPublishInfo.qos = IOT_MQTT_QOS_0;\r
637         xMQTTPublishInfo.retain = false;\r
638         xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;\r
639         xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );\r
640         xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;\r
641         xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );\r
642 \r
643         /* Find out length of Publish packet size. */\r
644         xResult = IotMqtt_GetPublishPacketSize( &xMQTTPublishInfo, &xRemainingLength, &xPacketSize );\r
645         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
646         /* Make sure the packet size is less than static buffer size */\r
647         configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );\r
648 \r
649         xResult = IotMqtt_SerializePublish( &xMQTTPublishInfo,\r
650                                                                                 xRemainingLength,\r
651                                                                                 &usPacketIdentifier,\r
652                                                                                 &pusPacketIdentifierHigh,\r
653                                                                                 ucSharedBuffer,\r
654                                                                                 xPacketSize );\r
655         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
656 \r
657         if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) != ( BaseType_t ) xPacketSize )\r
658         {\r
659                 xResult = IOT_MQTT_NETWORK_ERROR;\r
660         }\r
661         else\r
662         {\r
663                 /* Send success. Since in this case, we are using IOT_MQTT_QOS_0,\r
664                  * there will not be any PubAck. Publish will be echoed back, which is processed\r
665                  * in prvMQTTProcessIncomingPublish() */\r
666                 xResult = IOT_MQTT_SUCCESS;\r
667         }\r
668 \r
669         return xResult;\r
670 }\r
671 /*-----------------------------------------------------------*/\r
672 \r
673 static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket )\r
674 {\r
675 IotMqttError_t xResult;\r
676 IotMqttPacketInfo_t xIncomingPacket;\r
677 \r
678         memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );\r
679         xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );\r
680         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
681         configASSERT( ( xIncomingPacket.type & 0xf0 ) == MQTT_PACKET_TYPE_PUBLISH );\r
682         configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );\r
683 \r
684         /* Receive the remaining bytes. */\r
685         if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )\r
686         {\r
687                 xIncomingPacket.pRemainingData = ucSharedBuffer;\r
688 \r
689                 if( IotMqtt_DeserializePublish( &xIncomingPacket ) != IOT_MQTT_SUCCESS )\r
690                 {\r
691                         xResult = IOT_MQTT_BAD_RESPONSE;\r
692                 }\r
693                 else\r
694                 {\r
695                         /* Process incoming Publish */\r
696                         configPRINTF( ( "Incoming QOS : %d\n", xIncomingPacket.pubInfo.qos ) );\r
697                         configPRINTF( ( "Incoming Publish Topic Name: %.*s\n", xIncomingPacket.pubInfo.topicNameLength, xIncomingPacket.pubInfo.pTopicName ) );\r
698                         configPRINTF( ( "Incoming Publish Message : %.*s\n", xIncomingPacket.pubInfo.payloadLength, xIncomingPacket.pubInfo.pPayload ) );\r
699                 }\r
700         }\r
701         else\r
702         {\r
703                 xResult = IOT_MQTT_NETWORK_ERROR;\r
704         }\r
705 \r
706         return xResult;\r
707 }\r
708 \r
709 /*-----------------------------------------------------------*/\r
710 \r
711 static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket )\r
712 {\r
713 IotMqttError_t xResult;\r
714 IotMqttSubscription_t xMQTTSubscription[ 1 ];\r
715 size_t xRemainingLength;\r
716 size_t xPacketSize;\r
717 uint16_t usPacketIdentifier;\r
718 IotMqttPacketInfo_t xIncomingPacket;\r
719 \r
720         /* Some fields not used by this demo so start with everything at 0. */\r
721         memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );\r
722 \r
723         /* Unsubscribe to the mqttexampleTOPIC topic filter. The task handle is passed\r
724          * as the callback context which is used by the callback to send a task\r
725          * notification to this task.*/\r
726         xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0;\r
727         xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;\r
728         xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );\r
729 \r
730         xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_UNSUBSCRIBE,\r
731                                                                                                  xMQTTSubscription,\r
732                                                                                                  sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),\r
733                                                                                                  &xRemainingLength,\r
734                                                                                                  &xPacketSize );\r
735         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
736         /* Make sure the packet size is less than static buffer size */\r
737         configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );\r
738 \r
739         xResult = IotMqtt_SerializeUnsubscribe( xMQTTSubscription,\r
740                                                                                         sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),\r
741                                                                                         xRemainingLength,\r
742                                                                                         &usPacketIdentifier,\r
743                                                                                         ucSharedBuffer,\r
744                                                                                         xPacketSize );\r
745         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
746 \r
747         if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )\r
748         {\r
749                 /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so\r
750                  * publishes to this client can occur at any time and we cannot assume received\r
751                  * data is the subscription acknowledgment.  Therefore this function is, at this\r
752                  * time, doing what would otherwise be done wherever incoming packets are\r
753                  * interpreted (in a callback, or whatever). */\r
754                 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );\r
755                 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );\r
756                 configASSERT( xResult == IOT_MQTT_SUCCESS );\r
757                 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_UNSUBACK );\r
758                 configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );\r
759 \r
760                 /* Receive the remaining bytes. */\r
761                 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )\r
762                 {\r
763                         xIncomingPacket.pRemainingData = ucSharedBuffer;\r
764 \r
765                         if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )\r
766                         {\r
767                                 xResult = IOT_MQTT_BAD_RESPONSE;\r
768                         }\r
769                 }\r
770                 else\r
771                 {\r
772                         xResult = IOT_MQTT_NETWORK_ERROR;\r
773                 }\r
774         }\r
775         else\r
776         {\r
777                 xResult = IOT_MQTT_NETWORK_ERROR;\r
778         }\r
779 \r
780         return xResult;\r
781 }\r
782 /*-----------------------------------------------------------*/\r
783 \r
784 static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket )\r
785 {\r
786 IotMqttError_t xResult;\r
787 IotMqttPacketInfo_t xIncomingPacket;\r
788 \r
789         /* PingReq is fixed length packet, therefore there is no need to calculate the size,\r
790          * just makes sure static buffer can accommodate ping request */\r
791 \r
792         configASSERT( MQTT_PACKET_PINGREQ_SIZE <= mqttexampleSHARED_BUFFER_SIZE );\r
793 \r
794         xResult = IotMqtt_SerializePingreq( ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE );\r
795         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
796 \r
797         if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_PINGREQ_SIZE )\r
798         {\r
799                 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );\r
800                 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );\r
801                 configASSERT( xResult == IOT_MQTT_SUCCESS );\r
802                 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_PINGRESP );\r
803                 configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );\r
804 \r
805                 /* Receive the remaining bytes. */\r
806                 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 )\r
807                         == ( BaseType_t ) xIncomingPacket.remainingLength )\r
808                 {\r
809                         xIncomingPacket.pRemainingData = ucSharedBuffer;\r
810 \r
811                         if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )\r
812                         {\r
813                                 xResult = IOT_MQTT_BAD_RESPONSE;\r
814                         }\r
815                 }\r
816                 else\r
817                 {\r
818                         xResult = IOT_MQTT_NETWORK_ERROR;\r
819                 }\r
820         }\r
821         else\r
822         {\r
823                 xResult = IOT_MQTT_NETWORK_ERROR;\r
824         }\r
825 \r
826         return xResult;\r
827 }\r
828 \r
829 /*-----------------------------------------------------------*/\r
830 \r
831 static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket )\r
832 {\r
833 IotMqttError_t xResult;\r
834 \r
835         /* Disconnect is fixed length packet, therefore there is no need to calculate the size,\r
836          * just makes sure static buffer can accommodate disconnect request */\r
837 \r
838         configASSERT( MQTT_PACKET_DISCONNECT_SIZE <= mqttexampleSHARED_BUFFER_SIZE );\r
839 \r
840         xResult = IotMqtt_SerializeDisconnect( ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE );\r
841         configASSERT( xResult == IOT_MQTT_SUCCESS );\r
842 \r
843         if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_DISCONNECT_SIZE )\r
844         {\r
845                 xResult = IOT_MQTT_SUCCESS;\r
846         }\r
847         else\r
848         {\r
849                 xResult = IOT_MQTT_NETWORK_ERROR;\r
850         }\r
851 \r
852         return xResult;\r
853 }\r