2 * FreeRTOS Kernel V10.2.1
\r
3 * Copyright (C) 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://www.FreeRTOS.org
\r
23 * http://aws.amazon.com/freertos
\r
25 * 1 tab == 4 spaces!
\r
28 /* Standard inclues. */
\r
32 /* Kernel includes. */
\r
33 #include "FreeRTOS.h"
\r
36 /* IoT SDK includes. */
\r
37 #include "iot_mqtt.h"
\r
38 #include "iot_taskpool.h"
\r
39 #include "platform/iot_network_freertos.h"
\r
42 * @brief The keep-alive interval used for this example.
\r
44 * An MQTT ping request will be sent periodically at this interval.
\r
46 #define mqttexampleKEEP_ALIVE_SECONDS ( 60 )
\r
49 * @brief The timeout for MQTT operations in this example.
\r
51 #define mqttexampleMQTT_TIMEOUT_MS ( 5000 )
\r
54 * @brief The MQTT client identifier used in this example.
\r
56 #define mqttexampleCLIENT_IDENTIFIER "mqttexampleclient"
\r
59 * @brief Details of the MQTT broker to connect to.
\r
61 * @note This example does not use TLS and therefore won't work with AWS IoT.
\r
64 #define mqttexampleMQTT_BROKER_ENDPOINT "test.mosquitto.org"
\r
65 #define mqttexampleMQTT_BROKER_PORT 1883
\r
68 * @brief The topic to subscribe and publish to in the example.
\r
70 #define mqttexampleTOPIC "example/topic"
\r
73 * @brief The MQTT message published in this example.
\r
75 #define mqttexampleMESSAGE "Hello World!"
\r
78 * @brief Paramters to control the retry behaviour in case a QoS1 publish
\r
79 * message gets lost.
\r
81 * Retry every minutes up to a maximum of 5 retries.
\r
83 #define mqttexamplePUBLISH_RETRY_MS ( 1000 )
\r
84 #define mqttexamplePUBLISH_RETRY_LIMIT ( 5 )
\r
87 * @brief The bit which is set in the demo task's notification value from the
\r
88 * disconnect callback to inform the demo task about the MQTT disconnect.
\r
90 #define mqttexampleDISCONNECTED_BIT ( 1UL << 0UL )
\r
93 * @brief The bit which is set in the demo task's notification value from the
\r
94 * publish callback to inform the demo task about the message received from the
\r
97 #define mqttexampleMESSAGE_RECEIVED_BIT ( 1UL << 1UL )
\r
98 /*-----------------------------------------------------------*/
\r
101 * @brief The MQTT connection handle used in this example.
\r
103 static IotMqttConnection_t xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
\r
106 * @brief Parameters used to create the system task pool.
\r
108 static const IotTaskPoolInfo_t xTaskPoolParameters = {
\r
109 /* Minimum number of threads in a task pool.
\r
110 * Note the slimmed down version of the task
\r
111 * pool used by this library does not autoscale
\r
112 * the number of tasks in the pool so in this
\r
113 * case this sets the number of tasks in the
\r
116 /* Maximum number of threads in a task pool.
\r
117 * Note the slimmed down version of the task
\r
118 * pool used by this library does not autoscale
\r
119 * the number of tasks in the pool so in this
\r
120 * case this parameter is just ignored. */
\r
122 /* Stack size for every task pool thread - in
\r
123 * bytes, hence multiplying by the number of bytes
\r
124 * in a word as configMINIMAL_STACK_SIZE is
\r
125 * specified in words. */
\r
126 configMINIMAL_STACK_SIZE * sizeof( portSTACK_TYPE ),
\r
127 /* Priority for every task pool thread. */
\r
130 /*-----------------------------------------------------------*/
\r
133 * @brief The task used to demonstrate the MQTT API.
\r
135 * @param[in] pvParameters Parmaters as passed at the time of task creation. Not
\r
136 * used in this example.
\r
138 static void prvMQTTDemoTask( void *pvParameters );
\r
141 * @brief The callback invoked by the MQTT library when the MQTT connection gets
\r
144 * @param[in] pvCallbackContext Callback context as provided at the time of
\r
146 * @param[in] pxCallbackParams Contains the reason why the MQTT connection was
\r
149 static void prvExample_OnDisconnect( void * pvCallbackContext,
\r
150 IotMqttCallbackParam_t * pxCallbackParams );
\r
153 * @brief The callback invoked by the MQTT library when a message is received on
\r
154 * a subscribed topic from the MQTT broker.
\r
156 * @param[in] pvCallbackContext Callback context as provided at the time of
\r
158 * @param[in] pxCallbackParams Contain the details about the received message -
\r
159 * topic on which the message was received, the received message.
\r
161 static void prvExample_OnMessageReceived( void * pvCallbackContext,
\r
162 IotMqttCallbackParam_t * pxCallbackParams );
\r
165 * @brief Connects to the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT
\r
166 * and mqttexampleMQTT_BROKER_PORT.
\r
168 * @note This example does not use TLS and therefore will not work with MQTT.
\r
170 static void prvMQTTConnect( void );
\r
173 * @brief Subscribes to the topic as specified in mqttexampleTOPIC.
\r
175 static void prvMQTTSubscribe( void );
\r
178 * @brief Publishes a messages mqttexampleMESSAGE on mqttexampleTOPIC topic.
\r
180 static void prvMQTTPublish( void );
\r
183 * @brief Unsubscribes from the mqttexampleTOPIC topic.
\r
185 static void prvMQTTUnsubscribe( void );
\r
188 * @brief Disconnects from the MQTT broker gracefully by sending an MQTT
\r
189 * DISCONNECT message.
\r
191 static void prvMQTTDisconnect( void );
\r
192 /*-----------------------------------------------------------*/
\r
194 static void prvExample_OnDisconnect( void * pvCallbackContext,
\r
195 IotMqttCallbackParam_t * pxCallbackParams )
\r
197 TaskHandle_t xDemoTaskHandle = ( TaskHandle_t ) pvCallbackContext;
\r
199 /* Ensure that we initiated the disconnect. */
\r
200 configASSERT( pxCallbackParams->u.disconnectReason == IOT_MQTT_DISCONNECT_CALLED );
\r
202 /* Inform the demo task about the disconnect. */
\r
203 xTaskNotify( xDemoTaskHandle,
\r
204 mqttexampleDISCONNECTED_BIT,
\r
205 eSetBits /* Set the mqttexampleDISCONNECTED_BIT in the demo task's notification value. */
\r
208 /*-----------------------------------------------------------*/
\r
210 static void prvExample_OnMessageReceived( void * pvCallbackContext,
\r
211 IotMqttCallbackParam_t * pxCallbackParams )
\r
213 TaskHandle_t xDemoTaskHandle = ( TaskHandle_t ) pvCallbackContext;
\r
215 /* Ensure that the message is received on the expected topic. */
\r
216 configASSERT( pxCallbackParams->u.message.info.topicNameLength == strlen( mqttexampleTOPIC ) );
\r
217 configASSERT( strncmp( pxCallbackParams->u.message.info.pTopicName,
\r
219 strlen( mqttexampleTOPIC ) ) == 0 );
\r
221 /* Ensure that the expected message is received. */
\r
222 configASSERT( pxCallbackParams->u.message.info.payloadLength == strlen( mqttexampleMESSAGE ) );
\r
223 configASSERT( strncmp( pxCallbackParams->u.message.info.pPayload,
\r
224 mqttexampleMESSAGE,
\r
225 strlen( mqttexampleMESSAGE ) ) == 0 );
\r
227 /* Ensure that the message QoS is as expected. */
\r
228 configASSERT( pxCallbackParams->u.message.info.qos == IOT_MQTT_QOS_1 );
\r
230 /* Although this print uses the constants rather than the data from the
\r
231 message payload the asserts above have already checked the message payload
\r
232 equals the constants, and it is more efficient not to have to worry about
\r
233 lengths in the print. */
\r
234 configPRINTF( ( "Received %s from topic %s\r\n", mqttexampleMESSAGE, mqttexampleTOPIC ) );
\r
236 /* Inform the demo task about the message received from the MQTT broker. */
\r
237 xTaskNotify( xDemoTaskHandle,
\r
238 mqttexampleMESSAGE_RECEIVED_BIT,
\r
239 eSetBits /* Set the mqttexampleMESSAGE_RECEIVED_BIT in the demo task's notification value. */
\r
242 /*-----------------------------------------------------------*/
\r
244 void vStartSimpleMQTTDemo( void )
\r
246 /* This example uses a single application task, which in turn is used to
\r
247 * connect, subscribe, publish, unsubscribe and disconnect from the MQTT
\r
249 xTaskCreate( prvMQTTDemoTask, /* Function that implements the task. */
\r
250 "MQTTDemo", /* Text name for the task - only used for debugging. */
\r
251 configMINIMAL_STACK_SIZE, /* Size of stack (in words, not bytes) to allocate for the task. */
\r
252 NULL, /* Task parameter - not used in this case. */
\r
253 tskIDLE_PRIORITY, /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */
\r
254 NULL ); /* Used to pass out a handle to the created task - not used in this case. */
\r
256 /*-----------------------------------------------------------*/
\r
258 static void prvMQTTDemoTask( void *pvParameters )
\r
260 IotMqttError_t xResult;
\r
261 uint32_t ulNotificationValue = 0, ulIterations;
\r
262 const uint32_t ulMaxIterations = 5UL;
\r
263 const TickType_t xNoDelay = ( TickType_t ) 0;
\r
265 /* Remove compiler warnings about unused parameters. */
\r
266 ( void ) pvParameters;
\r
268 /* The MQTT library needs a task pool, so create the system task pool. */
\r
269 xResult = IotTaskPool_CreateSystemTaskPool( &( xTaskPoolParameters ) );
\r
270 configASSERT( xResult == IOT_TASKPOOL_SUCCESS );
\r
272 /* MQTT library must be initialized before it can be used. This is just one
\r
273 * time initialization. */
\r
274 xResult = IotMqtt_Init();
\r
275 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
279 /* Don't expect any notifications to be pending yet. */
\r
280 configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 );
\r
283 /******************** CONNECT ****************************************/
\r
285 /* Establish a connection to the MQTT broker. This example connects to
\r
286 * the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT and
\r
287 * mqttexampleMQTT_BROKER_PORT at the top of this file. Please change
\r
288 * it to the MQTT broker you want to connect to. Note that this example
\r
289 * does not use TLS and therefore will not work with AWS IoT. */
\r
291 configPRINTF( ( "Connected to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
\r
294 /******************* SUBSCRIBE ***************************************/
\r
296 /* The client is now connected to the broker. Subscribe to the topic
\r
297 as specified in mqttexampleTOPIC at the top of this file. This client
\r
298 will then publish to the same topic it subscribed to, so will expect
\r
299 all the messages it sends to the broker to be sent back to it from the
\r
301 prvMQTTSubscribe();
\r
302 configPRINTF( ( "Subscribed to %s\r\n", mqttexampleTOPIC ) );
\r
305 /******************* PUBLISH 5 TIMES *********************************/
\r
307 /* Publish a few messages while connected. */
\r
308 for( ulIterations = 0; ulIterations < ulMaxIterations; ulIterations++ )
\r
310 /* Publish a message on the mqttexampleTOPIC topic as specified at the
\r
311 * top of this file. */
\r
313 configPRINTF( ( "Published %s to %s\r\n", mqttexampleMESSAGE, mqttexampleTOPIC ) );
\r
315 /* Since we are subscribed on the same topic, we will get the same
\r
316 * message back from the MQTT broker. Wait for the message to be
\r
317 * received which is informed to us by the publish callback
\r
318 * (prvExample_OnMessageReceived) by setting the
\r
319 * mqttexampleMESSAGE_RECEIVED_BIT in this task's notification
\r
321 xTaskNotifyWait( 0UL, /* Don't clear any bits on entry. */
\r
322 mqttexampleMESSAGE_RECEIVED_BIT, /* Clear bit on exit. */
\r
323 &( ulNotificationValue ), /* Obtain the notification value. */
\r
324 pdMS_TO_TICKS( mqttexampleMQTT_TIMEOUT_MS ) );
\r
325 configASSERT( ( ulNotificationValue & mqttexampleMESSAGE_RECEIVED_BIT ) == mqttexampleMESSAGE_RECEIVED_BIT );
\r
329 /******************* UNSUBSCRIBE AND DISCONNECT **********************/
\r
331 /* Unsubscribe from the topic mqttexampleTOPIC the disconnect
\r
333 prvMQTTUnsubscribe();
\r
334 prvMQTTDisconnect();
\r
335 configPRINTF( ( "Disconnected from from %s\r\n\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
\r
337 /* Wait for the disconnect operation to complete which is informed to us
\r
338 * by the disconnect callback (prvExample_OnDisconnect)by setting
\r
339 * the mqttexampleDISCONNECTED_BIT in this task's notification value.
\r
340 * Note that all bits are cleared in the task's notification value to
\r
341 * ensure that it is ready for the next run. */
\r
342 xTaskNotifyWait( 0UL, /* Don't clear any bits on entry. */
\r
343 mqttexampleDISCONNECTED_BIT, /* Clear bit again on exit. */
\r
344 &( ulNotificationValue ), /* Obtain the notification value. */
\r
345 pdMS_TO_TICKS( mqttexampleMQTT_TIMEOUT_MS ) );
\r
346 configASSERT( ( ulNotificationValue & mqttexampleDISCONNECTED_BIT ) == mqttexampleDISCONNECTED_BIT );
\r
350 /* Wait for some time between two iterations to ensure that we do not
\r
351 * bombard the public test mosquitto broker. */
\r
352 configPRINTF( ( "prvMQTTDemoTask() completed an iteration without hitting an assert. Total free heap is %u\r\n\r\n", xPortGetFreeHeapSize() ) );
\r
353 vTaskDelay( pdMS_TO_TICKS( 5000 ) );
\r
356 /*-----------------------------------------------------------*/
\r
358 static void prvMQTTConnect( void )
\r
360 IotMqttError_t xResult;
\r
361 IotNetworkServerInfo_t xMQTTBrokerInfo;
\r
362 IotMqttNetworkInfo_t xNetworkInfo = IOT_MQTT_NETWORK_INFO_INITIALIZER;
\r
363 IotMqttConnectInfo_t xConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
\r
365 /******************* Broker information setup. **********************/
\r
366 xMQTTBrokerInfo.pHostName = mqttexampleMQTT_BROKER_ENDPOINT;
\r
367 xMQTTBrokerInfo.port = mqttexampleMQTT_BROKER_PORT;
\r
369 /******************* Network information setup. **********************/
\r
370 /* No connection to the MQTT broker has been established yet and we want to
\r
371 * establish a new connection. */
\r
372 xNetworkInfo.createNetworkConnection = true;
\r
373 xNetworkInfo.u.setup.pNetworkServerInfo = &( xMQTTBrokerInfo );
\r
375 /* This example does not use TLS and therefore pNetworkCredentialInfo must
\r
376 * be set to NULL. */
\r
377 xNetworkInfo.u.setup.pNetworkCredentialInfo = NULL;
\r
379 /* Use FreeRTOS+TCP network. */
\r
380 xNetworkInfo.pNetworkInterface = IOT_NETWORK_INTERFACE_FREERTOS;
\r
382 /* Setup the callback which is called when the MQTT connection is disconnected. */
\r
383 xNetworkInfo.disconnectCallback.pCallbackContext = ( void * ) xTaskGetCurrentTaskHandle();//_RB_ Why the task handle?
\r
384 xNetworkInfo.disconnectCallback.function = prvExample_OnDisconnect;
\r
386 /****************** MQTT Connection information setup. ********************/
\r
387 /* Set this flag to true if connecting to the AWS IoT MQTT broker. This
\r
388 example does not use TLS and therefore won't work with AWS IoT. */
\r
389 xConnectInfo.awsIotMqttMode = false;
\r
391 /* Start with a clean session i.e. direct the MQTT broker to discard any
\r
392 * previous session data. Also, establishing a connection with clean session
\r
393 * will ensure that the broker does not store any data when this client
\r
394 * gets disconnected. */
\r
395 xConnectInfo.cleanSession = true;
\r
397 /* Since we are starting with a clean session, there are no previous
\r
398 * subscriptions to be restored. */
\r
399 xConnectInfo.pPreviousSubscriptions = NULL;
\r
400 xConnectInfo.previousSubscriptionCount = 0;
\r
402 /* We do not want to publish Last Will and Testament (LWT) message if the
\r
403 * client gets disconnected. */
\r
404 xConnectInfo.pWillInfo = NULL;
\r
406 /* Send an MQTT PING request every minute to keep the connection open if
\r
407 there is no other MQTT trafic. */
\r
408 xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_SECONDS;
\r
410 /* The client identifier is used to uniquely identify this MQTT client to
\r
411 * the MQTT broker. In a production device the identifier can be something
\r
412 unique, such as a device serial number. */
\r
413 xConnectInfo.pClientIdentifier = mqttexampleCLIENT_IDENTIFIER;
\r
414 xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( mqttexampleCLIENT_IDENTIFIER );
\r
416 /* This example does not use any authentication and therefore username and
\r
417 * password fields are not used. */
\r
418 xConnectInfo.pUserName = NULL;
\r
419 xConnectInfo.userNameLength = 0;
\r
420 xConnectInfo.pPassword = NULL;
\r
421 xConnectInfo.passwordLength = 0;
\r
423 /* Establish the connection to the MQTT broker - It is a blocking call and
\r
424 will return only when connection is complete or a timeout occurrs. */
\r
425 xResult = IotMqtt_Connect( &( xNetworkInfo ),
\r
427 mqttexampleMQTT_TIMEOUT_MS,
\r
428 &( xMQTTConnection ) );
\r
429 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
431 /*-----------------------------------------------------------*/
\r
433 static void prvMQTTSubscribe( void )
\r
435 IotMqttError_t xResult;
\r
436 IotMqttSubscription_t xMQTTSubscription;
\r
438 /* Subscribe to the mqttexampleTOPIC topic filter. */
\r
439 xMQTTSubscription.qos = IOT_MQTT_QOS_1;
\r
440 xMQTTSubscription.pTopicFilter = mqttexampleTOPIC;
\r
441 xMQTTSubscription.topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
\r
442 xMQTTSubscription.callback.pCallbackContext = ( void * ) xTaskGetCurrentTaskHandle();
\r
443 xMQTTSubscription.callback.function = prvExample_OnMessageReceived;
\r
445 /* Use the synchronous API to subscribe - It is a blocking call and only
\r
446 * returns when the subscribe operation is complete. */
\r
447 xResult = IotMqtt_TimedSubscribe( xMQTTConnection,
\r
448 &( xMQTTSubscription ),
\r
449 1, /* We are subscribing to one topic filter. */
\r
450 0, /* flags - currently ignored. */
\r
451 mqttexampleMQTT_TIMEOUT_MS );
\r
452 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
454 /*-----------------------------------------------------------*/
\r
456 static void prvMQTTPublish( void )
\r
458 IotMqttError_t xResult;
\r
459 IotMqttPublishInfo_t xMQTTPublishInfo;
\r
461 /* Publish a message with QoS1 on the mqttexampleTOPIC topic. Since we are
\r
462 * subscribed to the same topic, the MQTT broker will send the same message
\r
463 * back to us. It is verified in the publish callback. */
\r
464 xMQTTPublishInfo.qos = IOT_MQTT_QOS_1;
\r
465 xMQTTPublishInfo.retain = false;
\r
466 xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
\r
467 xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
\r
468 xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
\r
469 xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
\r
470 xMQTTPublishInfo.retryMs = mqttexamplePUBLISH_RETRY_MS;
\r
471 xMQTTPublishInfo.retryLimit = mqttexamplePUBLISH_RETRY_LIMIT;
\r
473 /* Use the synchronous API to publish - It is a blocking call and only
\r
474 * returns when the publish operation is complete. */
\r
475 xResult = IotMqtt_TimedPublish( xMQTTConnection,
\r
476 &( xMQTTPublishInfo ),
\r
477 0, /* flags - currently ignored. */
\r
478 mqttexampleMQTT_TIMEOUT_MS );
\r
479 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
481 /*-----------------------------------------------------------*/
\r
483 static void prvMQTTUnsubscribe( void )
\r
485 IotMqttError_t xResult;
\r
486 IotMqttSubscription_t xMQTTSubscription;
\r
488 /* Unsubscribe from the mqttexampleTOPIC topic filter. */
\r
489 xMQTTSubscription.pTopicFilter = mqttexampleTOPIC;
\r
490 xMQTTSubscription.topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
\r
491 /* The following members of the IotMqttSubscription_t are ignored by the
\r
492 * unsubscribe operation. Just initialize them to avoid "use of uninitialized
\r
493 * variable" warnings. */
\r
494 xMQTTSubscription.qos = IOT_MQTT_QOS_1;
\r
495 xMQTTSubscription.callback.pCallbackContext = NULL;
\r
496 xMQTTSubscription.callback.function = NULL;
\r
498 /* Use the synchronous API to unsubscribe - It is a blocking call and only
\r
499 * returns when the unsubscribe operation is complete. */
\r
500 xResult = IotMqtt_TimedUnsubscribe( xMQTTConnection,
\r
501 &( xMQTTSubscription ),
\r
502 1, /* We are unsubscribing from one topic filter. */
\r
503 0, /* flags - currently ignored. */
\r
504 mqttexampleMQTT_TIMEOUT_MS );
\r
505 configASSERT( xResult == IOT_MQTT_SUCCESS );
\r
507 /*-----------------------------------------------------------*/
\r
509 static void prvMQTTDisconnect( void )
\r
511 /* Send a MQTT DISCONNECT packet to the MQTT broker to do a graceful
\r
513 IotMqtt_Disconnect( xMQTTConnection,
\r
514 0 /* flags - 0 means a graceful disconnect by sending MQTT DISCONNECT. */
\r
517 /*-----------------------------------------------------------*/
\r