2 * FreeRTOS Kernel V10.3.0
\r
3 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://www.FreeRTOS.org
\r
23 * http://aws.amazon.com/freertos
\r
25 * 1 tab == 4 spaces!
\r
29 * Proof of Concept for use of MQTT light weight serializer API.
\r
30 * Light weight serializer API lets user to serialize and
\r
31 * deserialize MQTT messages into user provided buffer.
\r
32 * This API allows use of statically allocated buffer.
\r
34 * Example shown below uses this API to create MQTT messages and
\r
35 * and send them over connection established using FreeRTOS sockets.
\r
36 * The example is single threaded and uses statically allocated memory.
\r
39 * This is work in progress to show how light weight serializer
\r
40 * API can be used. This is not a complete demo, and should not
\r
41 * be treated as production ready code.
\r
44 /* Standard includes. */
\r
48 /* Kernel includes. */
\r
49 #include "FreeRTOS.h"
\r
52 /* FreeRTOS+TCP includes. */
\r
53 #include "FreeRTOS_IP.h"
\r
54 #include "FreeRTOS_Sockets.h"
\r
56 /* IoT SDK includes. */
\r
57 #include "iot_mqtt.h"
\r
58 #include "iot_mqtt_serialize.h"
\r
59 #include "platform/iot_network_freertos.h"
\r
61 /* Demo Specific configs. */
\r
62 #include "mqtt_demo_profile.h"
\r
66 * @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask().
\r
68 #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000 ) )
\r
71 * @brief Time to wait before sending ping request to keep MQTT connection alive.
\r
73 #define mqttexampleKEEP_ALIVE_DELAY ( pdMS_TO_TICKS( 1000 ) )
\r
77 * @brief The MQTT client identifier used in this example. Each client identifier
\r
78 * must be unique so edit as required to ensure no two clients connecting to the
\r
79 * same broker use the same client identifier.
\r
81 #define mqttexampleCLIENT_IDENTIFIER mqttdemoprofileCLIENT_IDENTIFIER
\r
84 * @brief Details of the MQTT broker to connect to.
\r
86 #define mqttexampleMQTT_BROKER_ENDPOINT mqttdemoprofileBROKER_ENDPOINT
\r
89 * @brief The port to use for the demo.
\r
91 #define mqttexampleMQTT_BROKER_PORT mqttdemoprofileBROKER_PORT
\r
94 * @brief The topic to subscribe and publish to in the example.
\r
96 * The topic starts with the client identifier to ensure that each demo interacts
\r
97 * with a unique topic.
\r
99 #define mqttexampleTOPIC mqttexampleCLIENT_IDENTIFIER "/example/topic"
\r
102 * @brief The MQTT message published in this example.
\r
104 #define mqttexampleMESSAGE "Hello Light Weight MQTT World!"
\r
107 * @brief Dimensions a file scope buffer currently used to send and receive MQTT data from a
\r
110 #define mqttexampleSHARED_BUFFER_SIZE 500
\r
112 /*-----------------------------------------------------------*/
\r
115 * @brief MQTT Protocol constants used by this demo.
\r
116 * These types are defined in internal MQTT include files.
\r
117 * For light-weight demo application, only a few are needed, therefore
\r
118 * they are redefined here so that internal files need not be included.
\r
121 /* MQTT Control Packet Types.*/
\r
122 #define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */
\r
123 #define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bi-directional). */
\r
124 #define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */
\r
125 #define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xb0U ) /**< @brief UNSUBACK (server-to-client). */
\r
126 #define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xd0U ) /**< @brief PINGRESP (server-to-client). */
\r
127 /* MQTT Fixed Packet Sizes */
\r
128 #define MQTT_PACKET_DISCONNECT_SIZE ( ( uint8_t ) 2 ) /**< @brief Size of DISCONNECT packet. */
\r
129 #define MQTT_PACKET_PINGREQ_SIZE ( ( uint8_t ) 2 ) /**< @brief Size of PINGREQ packet. */
\r
131 /*-----------------------------------------------------------*/
\r
134 * @brief The task used to demonstrate the MQTT API.
\r
136 * @param[in] pvParameters Parameters as passed at the time of task creation. Not
\r
137 * used in this example.
\r
139 static void prvMQTTDemoTask( void * pvParameters );
\r
142 * @brief Creates a TCP connection to the MQTT broker as specified in
\r
143 * mqttexampleMQTT_BROKER_ENDPOINT and mqttexampleMQTT_BROKER_PORT.
\r
145 * @return On success the socket connected to the MQTT broker is returned. Otherwise
\r
146 * FREERTOS_INVALID_SOCKET is returned.
\r
149 static Socket_t prvCreateTCPConnectionToBroker( void );
\r
152 * @brief Sends an MQTT Connect packet over the already connected TCP socket.
\r
154 * @param xMQTTSocketis a TCP socket that is connected to an MQTT broker to which
\r
155 * an MQTT connection has been established.
\r
157 * @return IOT_MQTT_SUCCESS is returned if the reply is a valid connection
\r
158 * acknowledgeable (CONNACK) packet, otherwise an error code is returned.
\r
160 static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket );
\r
163 * @brief Performs a graceful shutdown and close of the socket passed in as its
\r
166 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
167 * an MQTT connection has been established.
\r
169 static void prvGracefulShutDown( Socket_t xSocket );
\r
172 * @brief Subscribes to the topic as specified in mqttexampleTOPIC.
\r
174 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
175 * an MQTT connection has been established.
\r
177 * @return IOT_MQTT_SUCCESS is returned if the
\r
178 * subscription is successful, otherwise an error code is returned.
\r
180 static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket );
\r
183 * @brief Publishes a messages mqttexampleMESSAGE on mqttexampleTOPIC topic.
\r
185 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
186 * an MQTT connection has been established.
\r
188 * @return IOT_MQTT_SUCCESS is returned if Publish is successful,
\r
189 * otherwise an error code is returned.
\r
191 static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket );
\r
194 * @brief Process Incoming Publish.
\r
196 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
197 * an MQTT connection has been established.
\r
199 * @return #IOT_MQTT_SUCCESS is returned if the processing is successful,
\r
200 * otherwise an error code is returned.
\r
202 static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket );
\r
205 * @brief Unsubscribes from the previously subscribed topic as specified
\r
206 * in mqttexampleTOPIC.
\r
208 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
209 * an MQTT connection has been established.
\r
211 * @return IOT_MQTT_SUCCESS is returned if the
\r
212 * unsubscribe is successful, otherwise an error code is returned.
\r
214 static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket );
\r
217 * @brief Send MQTT Ping Request to broker and receive response.
\r
218 * Ping request is used to keep connection to broker alive.
\r
220 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
221 * an MQTT connection has been established.
\r
223 * @return IOT_MQTT_SUCCESS is returned if the successful Ping Response is received.
\r
224 * otherwise an error code is returned.
\r
226 static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket );
\r
229 * @brief Disconnect From MQTT Broker.
\r
231 * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
\r
232 * an MQTT connection has been established.
\r
234 * @return IOT_MQTT_SUCCESS is returned if the disconnect is successful,
\r
235 * otherwise an error code is returned.
\r
237 static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket );
\r
240 /*-----------------------------------------------------------*/
\r
243 * @brief Function to receive next byte from network,
\r
244 * The declaration must match IotMqttGetNextByte_t.
\r
246 * @param[in] pvContext Network Connection context. Implementation in this
\r
247 * file uses FreeRTOS socket.
\r
248 * @param[in, out] pNextBye Pointer to buffer where the byte will be stored.
\r
250 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_TIMEOUT
\r
253 IotMqttError_t getNextByte( void * pvContext,
\r
254 uint8_t * pNextByte );
\r
256 /*-----------------------------------------------------------*/
\r
258 /* @brief Static memory buffer used for sending and receiving MQTT messages */
\r
259 static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ];
\r
261 /*-----------------------------------------------------------*/
\r
264 * @brief Task for Light Weight MQTT Serializer API Proof of Concept.
\r
265 * To run the proof of concept example, in main.c, in function vApplicationIPNetworkEventHook(),
\r
266 * replace vStartSimpleMQTTDemo() with vApplicationIPNetworkEventHook().
\r
268 void vStartLightWeightMQTTDemo( void )
\r
270 TickType_t xShortDelay = ( TickType_t ) pdMS_TO_TICKS( ( TickType_t ) 500 );
\r
272 /* Wait a short time to allow receipt of the ARP replies. */
\r
273 vTaskDelay( xShortDelay );
\r
275 /* This example uses a single application task, which in turn is used to
\r
276 * connect, subscribe, publish, unsubscribe and disconnect from the MQTT
\r
278 xTaskCreate( prvMQTTDemoTask, /* Function that implements the task. */
\r
279 "MQTTLWDemo", /* Text name for the task - only used for debugging. */
\r
280 democonfigDEMO_STACKSIZE, /* Size of stack (in words, not bytes) to allocate for the task. */
\r
281 NULL, /* Task parameter - not used in this case. */
\r
282 tskIDLE_PRIORITY, /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */
\r
283 NULL ); /* Used to pass out a handle to the created task - not used in this case. */
\r
285 /*-----------------------------------------------------------*/
\r
287 static void prvGracefulShutDown( Socket_t xSocket )
\r
289 uint8_t ucDummy[ 20 ];
\r
290 const TickType_t xShortDelay = pdMS_TO_MIN_TICKS( 250 );
\r
292 if( xSocket != ( Socket_t ) 0 )
\r
294 if( xSocket != FREERTOS_INVALID_SOCKET )
\r
296 /* Initiate graceful shutdown. */
\r
297 FreeRTOS_shutdown( xSocket, FREERTOS_SHUT_RDWR );
\r
299 /* Wait for the socket to disconnect gracefully (indicated by FreeRTOS_recv()
\r
300 * returning a FREERTOS_EINVAL error) before closing the socket. */
\r
301 while( FreeRTOS_recv( xSocket, ucDummy, sizeof( ucDummy ), 0 ) >= 0 )
\r
303 /* Wait for shutdown to complete. If a receive block time is used then
\r
304 * this delay will not be necessary as FreeRTOS_recv() will place the RTOS task
\r
305 * into the Blocked state anyway. */
\r
306 vTaskDelay( xShortDelay );
\r
308 /* Note ? real applications should implement a timeout here, not just
\r
312 /* The socket has shut down and is safe to close. */
\r
313 FreeRTOS_closesocket( xSocket );
\r
317 /*-----------------------------------------------------------*/
\r
319 IotMqttError_t getNextByte( void * pvContext,
\r
320 uint8_t * pNextByte )
\r
322 Socket_t xMQTTSocket = ( Socket_t ) pvContext;
\r
323 BaseType_t receivedBytes;
\r
324 IotMqttError_t result;
\r
326 /* Receive one byte from network */
\r
327 receivedBytes = FreeRTOS_recv( xMQTTSocket, ( void * ) pNextByte, sizeof( uint8_t ), 0 );
\r
329 if( receivedBytes == sizeof( uint8_t ) )
\r
331 result = IOT_MQTT_SUCCESS;
\r
335 result = IOT_MQTT_TIMEOUT;
\r
341 /*-----------------------------------------------------------*/
\r
343 static void prvMQTTDemoTask( void * pvParameters )
\r
345 const TickType_t xNoDelay = ( TickType_t ) 0;
\r
346 Socket_t xMQTTSocket;
\r
347 IotMqttError_t xReturned;
\r
348 uint32_t ulPublishCount = 0;
\r
349 const uint32_t ulMaxPublishCount = 5UL;
\r
351 /* Remove compiler warnings about unused parameters. */
\r
352 ( void ) pvParameters;
\r
356 /* Don't expect any notifications to be pending yet. */
\r
357 configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 );
\r
359 /****************************** Connect. ******************************/
\r
361 /* Establish a TCP connection with the MQTT broker. This example connects to
\r
362 * the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT and
\r
363 * mqttexampleMQTT_BROKER_PORT at the top of this file. */
\r
364 configPRINTF( ( "Create a TCP connection to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
\r
365 xMQTTSocket = prvCreateTCPConnectionToBroker();
\r
366 configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET );
\r
367 configPRINTF( ( "Connected to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
\r
369 /* Sends an MQTT Connect packet over the already connected TCP socket
\r
370 * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is
\r
371 * returned if the reply is a valid connection acknowledgeable (CONNACK) packet,
\r
372 * otherwise an error code is returned. */
\r
373 configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
\r
374 xReturned = prvCreateMQTTConnectionWithBroker( xMQTTSocket );
\r
375 configASSERT( xReturned == IOT_MQTT_SUCCESS );
\r
376 configPRINTF( ( "Established an MQTT connection.\r\n" ) );
\r
378 /**************************** Subscribe. ******************************/
\r
380 /* The client is now connected to the broker. Subscribe to the topic
\r
381 * as specified in mqttexampleTOPIC at the top of this file by sending a
\r
382 * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).
\r
383 * This client will then publish to the same topic it subscribed to, so will
\r
384 * expect all the messages it sends to the broker to be sent back to it
\r
385 * from the broker. */
\r
386 configPRINTF( ( "Attempt to subscribed to the MQTT topic %s\r\n", mqttexampleTOPIC ) );
\r
387 xReturned = prvMQTTSubscribeToTopic( xMQTTSocket );
\r
388 configPRINTF( ( "Subscribed to the topic %s\r\n", mqttexampleTOPIC ) );
\r
390 /**************************** Publish. ******************************/
\r
391 /* Send publish for with QOS0, Process Keep alive */
\r
392 for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
\r
394 configPRINTF( ( "Attempt to publish to the MQTT topic %s\r\n", mqttexampleTOPIC ) );
\r
395 xReturned = prvMQTTPublishToTopic( xMQTTSocket );
\r
396 configASSERT( xReturned == IOT_MQTT_SUCCESS );
\r
397 configPRINTF( ( "Publish successful to the topic %s\r\n", mqttexampleTOPIC ) );
\r
399 /* Process incoming publish echo, since application subscribed to the same topic
\r
400 * broker will send publish message back to the application */
\r
401 configPRINTF( ( "Attempt to receive publish message from broker\r\n" ) );
\r
402 xReturned = prvMQTTProcessIncomingPublish( xMQTTSocket );
\r
403 configASSERT( xReturned == IOT_MQTT_SUCCESS );
\r
404 configPRINTF( ( "Successfully Received Publish message from broker\r\n" ) );
\r
406 /* Leave Connection Idle for some time */
\r
407 configPRINTF( ( "Keeping Connection Idle\r\n" ) );
\r
408 vTaskDelay( pdMS_TO_TICKS( mqttexampleKEEP_ALIVE_DELAY ) );
\r
409 /* Send Ping request to broker and receive ping response */
\r
410 configPRINTF( ( "Sending Ping Request to the broker\r\n" ) );
\r
411 xReturned = prvMQTTKeepAlive( xMQTTSocket );
\r
412 configASSERT( xReturned == IOT_MQTT_SUCCESS );
\r
413 configPRINTF( ( "Ping Response successfully received\r\n" ) );
\r
416 /************************ Unsubscribe from the topic. **************************/
\r
417 configPRINTF( ( "Attempt to unsubscribe from the MQTT topic %s\r\n", mqttexampleTOPIC ) );
\r
418 xReturned = prvMQTTUnsubscribeFromTopic( xMQTTSocket );
\r
419 configASSERT( xReturned == IOT_MQTT_SUCCESS );
\r
420 configPRINTF( ( "Unsubscribe from the topic %s\r\n", mqttexampleTOPIC ) );
\r
422 /**************************** Disconnect. ******************************/
\r
424 /* Sends an MQTT Disconnect packet over the already connected TCP socket
\r
425 * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is
\r
426 * returned if the reply is a valid connection acknowledgeable (CONNACK) packet,
\r
427 * otherwise an error code is returned. */
\r
429 configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
\r
430 xReturned = prvMQTTDisconnect( xMQTTSocket );
\r
431 configASSERT( xReturned == IOT_MQTT_SUCCESS );
\r
432 configPRINTF( ( "Established an MQTT connection.\r\n" ) );
\r
433 /* Disconnect from broker. */
\r
434 prvGracefulShutDown( xMQTTSocket );
\r
436 /* Wait for some time between two iterations to ensure that we do not
\r
437 * bombard the public test mosquitto broker. */
\r
438 configPRINTF( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u\r\n", xPortGetFreeHeapSize() ) );
\r
439 configPRINTF( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
\r
440 vTaskDelay( pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ) );
\r
443 /*-----------------------------------------------------------*/
\r
445 Socket_t prvCreateTCPConnectionToBroker( void )
\r
447 Socket_t xMQTTSocket;
\r
448 struct freertos_sockaddr xBrokerAddress;
\r
449 uint32_t ulBrokerIPAddress;
\r
451 /* This is the socket used to connect to the MQTT broker. */
\r
452 xMQTTSocket = FreeRTOS_socket( FREERTOS_AF_INET,
\r
453 FREERTOS_SOCK_STREAM,
\r
454 FREERTOS_IPPROTO_TCP );
\r
456 configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET );
\r
458 /* Locate then connect to the MQTT broker. */
\r
459 ulBrokerIPAddress = FreeRTOS_gethostbyname( mqttexampleMQTT_BROKER_ENDPOINT );
\r
461 if( ulBrokerIPAddress != 0 )
\r
463 xBrokerAddress.sin_port = FreeRTOS_htons( mqttexampleMQTT_BROKER_PORT );
\r
464 xBrokerAddress.sin_addr = ulBrokerIPAddress;
\r
466 if( FreeRTOS_connect( xMQTTSocket, &xBrokerAddress, sizeof( xBrokerAddress ) ) != 0 )
\r
468 /* Could not connect so delete socket and return an error. */
\r
469 FreeRTOS_closesocket( xMQTTSocket );
\r
470 xMQTTSocket = FREERTOS_INVALID_SOCKET;
\r
474 return xMQTTSocket;
\r
476 /*-----------------------------------------------------------*/
\r
478 static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket )
\r
480 IotMqttConnectInfo_t xConnectInfo;
\r
481 size_t xRemainingLength = 0;
\r
482 size_t xPacketSize = 0;
\r
483 IotMqttError_t xResult;
\r
484 IotMqttPacketInfo_t xIncomingPacket;
\r
486 /* Many fields not used in this demo so start with everything at 0. */
\r
487 memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
\r
488 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( xIncomingPacket ) );
\r
490 /* Start with a clean session i.e. direct the MQTT broker to discard any
\r
491 * previous session data. Also, establishing a connection with clean session
\r
492 * will ensure that the broker does not store any data when this client
\r
493 * gets disconnected. */
\r
494 xConnectInfo.cleanSession = true;
\r
496 /* The client identifier is used to uniquely identify this MQTT client to
\r
497 * the MQTT broker. In a production device the identifier can be something
\r
498 * unique, such as a device serial number. */
\r
499 xConnectInfo.pClientIdentifier = mqttexampleCLIENT_IDENTIFIER;
\r
500 xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( mqttexampleCLIENT_IDENTIFIER );
\r
502 /* Get size requirement for the connect packet */
\r
503 xResult = IotMqtt_GetConnectPacketSize( &xConnectInfo, &xRemainingLength, &xPacketSize );
\r
504 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
505 /* Make sure the packet size is less than static buffer size */
\r
506 configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
\r
507 /* Serialize MQTT connect packet into provided buffer */
\r
508 xResult = IotMqtt_SerializeConnect( &xConnectInfo, xRemainingLength, ucSharedBuffer, xPacketSize );
\r
509 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
511 if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )
\r
513 /* Wait for the connection ack. TODO check the receive timeout value. */
\r
515 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
\r
517 /* Get packet type and remaining length of the received packet
\r
518 * We cannot assume received data is the connection acknowledgment.
\r
519 * Therefore this function reads type and remaining length of the
\r
520 * received packet, before processing entire packet.
\r
522 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
\r
523 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
524 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_CONNACK );
\r
525 configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
\r
527 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 )
\r
528 == ( BaseType_t ) xIncomingPacket.remainingLength )
\r
530 xIncomingPacket.pRemainingData = ucSharedBuffer;
\r
532 if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
\r
534 xResult = IOT_MQTT_SERVER_REFUSED;
\r
539 configPRINTF( ( "Receive Failed while receiving MQTT ConnAck\n" ) );
\r
540 xResult = IOT_MQTT_NETWORK_ERROR;
\r
545 configPRINTF( ( "Send Failed while connecting to MQTT broker\n" ) );
\r
546 xResult = IOT_MQTT_NETWORK_ERROR;
\r
551 /*-----------------------------------------------------------*/
\r
553 static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket )
\r
555 IotMqttError_t xResult;
\r
556 IotMqttSubscription_t xMQTTSubscription[ 1 ];
\r
557 size_t xRemainingLength = 0;
\r
558 size_t xPacketSize = 0;
\r
559 uint16_t usPacketIdentifier;
\r
560 IotMqttPacketInfo_t xIncomingPacket;
\r
562 /* Some fields not used by this demo so start with everything at 0. */
\r
563 memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
\r
565 /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to only one topic */
\r
566 xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0;
\r
567 xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
\r
568 xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
\r
570 xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_SUBSCRIBE,
\r
572 sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
\r
573 &xRemainingLength, &xPacketSize );
\r
574 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
575 /* Make sure the packet size is less than static buffer size */
\r
576 configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
\r
578 /* Serialize subscribe into statically allocated ucSharedBuffer */
\r
579 xResult = IotMqtt_SerializeSubscribe( xMQTTSubscription,
\r
580 sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
\r
582 &usPacketIdentifier,
\r
586 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
588 if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )
\r
590 /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so
\r
591 * publishes to this client can occur at any time and we cannot assume received
\r
592 * data is the subscription acknowledgment. Therefore this function is, at this
\r
593 * time, doing what would otherwise be done wherever incoming packets are
\r
594 * interpreted (in a callback, or whatever). */
\r
595 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
\r
596 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
\r
597 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
598 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_SUBACK );
\r
599 configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
\r
601 /* Receive the remaining bytes. */
\r
602 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )
\r
604 xIncomingPacket.pRemainingData = ucSharedBuffer;
\r
606 if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
\r
608 xResult = IOT_MQTT_BAD_RESPONSE;
\r
613 xResult = IOT_MQTT_NETWORK_ERROR;
\r
618 xResult = IOT_MQTT_NETWORK_ERROR;
\r
623 /*-----------------------------------------------------------*/
\r
625 static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket )
\r
627 IotMqttError_t xResult;
\r
628 IotMqttPublishInfo_t xMQTTPublishInfo;
\r
629 size_t xRemainingLength = 0;
\r
630 size_t xPacketSize = 0;
\r
631 uint16_t usPacketIdentifier;
\r
632 uint8_t * pusPacketIdentifierHigh;
\r
634 /* Some fields not used by this demo so start with everything at 0. */
\r
635 memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );
\r
636 xMQTTPublishInfo.qos = IOT_MQTT_QOS_0;
\r
637 xMQTTPublishInfo.retain = false;
\r
638 xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
\r
639 xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
\r
640 xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
\r
641 xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
\r
643 /* Find out length of Publish packet size. */
\r
644 xResult = IotMqtt_GetPublishPacketSize( &xMQTTPublishInfo, &xRemainingLength, &xPacketSize );
\r
645 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
646 /* Make sure the packet size is less than static buffer size */
\r
647 configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
\r
649 xResult = IotMqtt_SerializePublish( &xMQTTPublishInfo,
\r
651 &usPacketIdentifier,
\r
652 &pusPacketIdentifierHigh,
\r
655 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
657 if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) != ( BaseType_t ) xPacketSize )
\r
659 xResult = IOT_MQTT_NETWORK_ERROR;
\r
663 /* Send success. Since in this case, we are using IOT_MQTT_QOS_0,
\r
664 * there will not be any PubAck. Publish will be echoed back, which is processed
\r
665 * in prvMQTTProcessIncomingPublish() */
\r
666 xResult = IOT_MQTT_SUCCESS;
\r
671 /*-----------------------------------------------------------*/
\r
673 static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket )
\r
675 IotMqttError_t xResult;
\r
676 IotMqttPacketInfo_t xIncomingPacket;
\r
678 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
\r
679 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
\r
680 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
681 configASSERT( ( xIncomingPacket.type & 0xf0 ) == MQTT_PACKET_TYPE_PUBLISH );
\r
682 configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
\r
684 /* Receive the remaining bytes. */
\r
685 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )
\r
687 xIncomingPacket.pRemainingData = ucSharedBuffer;
\r
689 if( IotMqtt_DeserializePublish( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
\r
691 xResult = IOT_MQTT_BAD_RESPONSE;
\r
695 /* Process incoming Publish */
\r
696 configPRINTF( ( "Incoming QOS : %d\n", xIncomingPacket.pubInfo.qos ) );
\r
697 configPRINTF( ( "Incoming Publish Topic Name: %.*s\n", xIncomingPacket.pubInfo.topicNameLength, xIncomingPacket.pubInfo.pTopicName ) );
\r
698 configPRINTF( ( "Incoming Publish Message : %.*s\n", xIncomingPacket.pubInfo.payloadLength, xIncomingPacket.pubInfo.pPayload ) );
\r
703 xResult = IOT_MQTT_NETWORK_ERROR;
\r
709 /*-----------------------------------------------------------*/
\r
711 static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket )
\r
713 IotMqttError_t xResult;
\r
714 IotMqttSubscription_t xMQTTSubscription[ 1 ];
\r
715 size_t xRemainingLength;
\r
716 size_t xPacketSize;
\r
717 uint16_t usPacketIdentifier;
\r
718 IotMqttPacketInfo_t xIncomingPacket;
\r
720 /* Some fields not used by this demo so start with everything at 0. */
\r
721 memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
\r
723 /* Unsubscribe to the mqttexampleTOPIC topic filter. The task handle is passed
\r
724 * as the callback context which is used by the callback to send a task
\r
725 * notification to this task.*/
\r
726 xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0;
\r
727 xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
\r
728 xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
\r
730 xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_UNSUBSCRIBE,
\r
732 sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
\r
735 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
736 /* Make sure the packet size is less than static buffer size */
\r
737 configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
\r
739 xResult = IotMqtt_SerializeUnsubscribe( xMQTTSubscription,
\r
740 sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
\r
742 &usPacketIdentifier,
\r
745 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
747 if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )
\r
749 /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so
\r
750 * publishes to this client can occur at any time and we cannot assume received
\r
751 * data is the subscription acknowledgment. Therefore this function is, at this
\r
752 * time, doing what would otherwise be done wherever incoming packets are
\r
753 * interpreted (in a callback, or whatever). */
\r
754 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
\r
755 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
\r
756 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
757 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_UNSUBACK );
\r
758 configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );
\r
760 /* Receive the remaining bytes. */
\r
761 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )
\r
763 xIncomingPacket.pRemainingData = ucSharedBuffer;
\r
765 if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
\r
767 xResult = IOT_MQTT_BAD_RESPONSE;
\r
772 xResult = IOT_MQTT_NETWORK_ERROR;
\r
777 xResult = IOT_MQTT_NETWORK_ERROR;
\r
782 /*-----------------------------------------------------------*/
\r
784 static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket )
\r
786 IotMqttError_t xResult;
\r
787 IotMqttPacketInfo_t xIncomingPacket;
\r
789 /* PingReq is fixed length packet, therefore there is no need to calculate the size,
\r
790 * just makes sure static buffer can accommodate ping request */
\r
792 configASSERT( MQTT_PACKET_PINGREQ_SIZE <= mqttexampleSHARED_BUFFER_SIZE );
\r
794 xResult = IotMqtt_SerializePingreq( ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE );
\r
795 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
797 if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_PINGREQ_SIZE )
\r
799 memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
\r
800 xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
\r
801 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
802 configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_PINGRESP );
\r
803 configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );
\r
805 /* Receive the remaining bytes. */
\r
806 if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 )
\r
807 == ( BaseType_t ) xIncomingPacket.remainingLength )
\r
809 xIncomingPacket.pRemainingData = ucSharedBuffer;
\r
811 if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
\r
813 xResult = IOT_MQTT_BAD_RESPONSE;
\r
818 xResult = IOT_MQTT_NETWORK_ERROR;
\r
823 xResult = IOT_MQTT_NETWORK_ERROR;
\r
829 /*-----------------------------------------------------------*/
\r
831 static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket )
\r
833 IotMqttError_t xResult;
\r
835 /* Disconnect is fixed length packet, therefore there is no need to calculate the size,
\r
836 * just makes sure static buffer can accommodate disconnect request */
\r
838 configASSERT( MQTT_PACKET_DISCONNECT_SIZE <= mqttexampleSHARED_BUFFER_SIZE );
\r
840 xResult = IotMqtt_SerializeDisconnect( ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE );
\r
841 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
843 if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_DISCONNECT_SIZE )
\r
845 xResult = IOT_MQTT_SUCCESS;
\r
849 xResult = IOT_MQTT_NETWORK_ERROR;
\r