]> git.sur5r.net Git - freertos/blob - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_agent.c
Correct an err in queue.c introduced when previously updating behaviour when queue...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_agent.c
1 /*\r
2  * Amazon FreeRTOS MQTT V2.0.0\r
3  * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.\r
4  *\r
5  * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
6  * this software and associated documentation files (the "Software"), to deal in\r
7  * the Software without restriction, including without limitation the rights to\r
8  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
9  * the Software, and to permit persons to whom the Software is furnished to do so,\r
10  * subject to the following conditions:\r
11  *\r
12  * The above copyright notice and this permission notice shall be included in all\r
13  * copies or substantial portions of the Software.\r
14  *\r
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
17  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
18  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
19  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
21  *\r
22  * http://aws.amazon.com/freertos\r
23  * http://www.FreeRTOS.org\r
24  */\r
25 \r
26 /**\r
27  * @file iot_mqtt_agent.c\r
28  * @brief MQTT Agent implementation. Provides backwards compatibility between\r
29  * MQTT v2 and MQTT v1.\r
30  */\r
31 \r
32 /* The config header is always included first. */\r
33 #include "iot_config.h"\r
34 \r
35 /* Standard includes. */\r
36 #include <string.h>\r
37 \r
38 /* FreeRTOS includes. */\r
39 #include "FreeRTOS.h"\r
40 #include "semphr.h"\r
41 \r
42 /* MQTT v1 includes. */\r
43 #include "iot_mqtt_agent.h"\r
44 #include "iot_mqtt_agent_config.h"\r
45 #include "iot_mqtt_agent_config_defaults.h"\r
46 \r
47 /* MQTT v2 include. */\r
48 #include "iot_mqtt.h"\r
49 \r
50 /* Platform network include. */\r
51 #include "platform/iot_network_freertos.h"\r
52 \r
53 /*-----------------------------------------------------------*/\r
54 \r
55 /**\r
56  * @brief Converts FreeRTOS ticks to milliseconds.\r
57  */\r
58 #define mqttTICKS_TO_MS( xTicks )    ( xTicks * 1000 / configTICK_RATE_HZ )\r
59 \r
60 /*-----------------------------------------------------------*/\r
61 \r
62 /**\r
63  * @brief Stores data to convert between the MQTT v1 subscription callback\r
64  * and the MQTT v2 subscription callback.\r
65  */\r
66 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
67     typedef struct MQTTCallback\r
68     {\r
69         BaseType_t xInUse;                                                     /**< Whether this instance is in-use. */\r
70         MQTTPublishCallback_t xFunction;                                       /**< MQTT v1 callback function. */\r
71         void * pvParameter;                                                    /**< Parameter to xFunction. */\r
72 \r
73         uint16_t usTopicFilterLength;                                          /**< Length of pcTopicFilter. */\r
74         char pcTopicFilter[ mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH ]; /**< Topic filter. */\r
75     } MQTTCallback_t;\r
76 #endif\r
77 \r
78 /**\r
79  * @brief Stores data on an active MQTT connection.\r
80  */\r
81 typedef struct MQTTConnection\r
82 {\r
83     IotMqttConnection_t xMQTTConnection; /**< MQTT v2 connection handle. */\r
84     MQTTAgentCallback_t pxCallback;      /**< MQTT v1 global callback. */\r
85     void * pvUserData;                   /**< Parameter to pxCallback. */\r
86     StaticSemaphore_t xConnectionMutex;  /**< Protects from concurrent accesses. */\r
87     #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
88         MQTTCallback_t xCallbacks        /**< Conversion table of MQTT v1 to MQTT v2 subscription callbacks. */\r
89         [ mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS ];\r
90     #endif\r
91 } MQTTConnection_t;\r
92 \r
93 /*-----------------------------------------------------------*/\r
94 \r
95 /**\r
96  * @brief Convert an MQTT v2 return code to an MQTT v1 return code.\r
97  *\r
98  * @param[in] xMqttStatus The MQTT v2 return code.\r
99  *\r
100  * @return An equivalent MQTT v1 return code.\r
101  */\r
102 static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus );\r
103 \r
104 /**\r
105  * @brief Wraps an MQTT v1 publish callback.\r
106  *\r
107  * @param[in] pvParameter The MQTT connection.\r
108  * @param[in] pxPublish Information about the incoming publish.\r
109  */\r
110 static void prvPublishCallbackWrapper( void * pvParameter,\r
111                                        IotMqttCallbackParam_t * const pxPublish );\r
112 \r
113 /**\r
114  * @brief Wraps an MQTT v1 disconnect callback.\r
115  *\r
116  * @param[in] pvCallbackContext The MQTT connection.\r
117  * @param[in] pxDisconnect Information about the disconnect.\r
118  */\r
119 static void prvDisconnectCallbackWrapper( void * pvParameter,\r
120                                           IotMqttCallbackParam_t * pxDisconnect );\r
121 \r
122 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
123 \r
124 /**\r
125  * @brief Store an MQTT v1 callback in the conversion table.\r
126  *\r
127  * @param[in] pxConnection Where to store the callback.\r
128  * @param[in] pcTopicFilter Topic filter to store.\r
129  * @param[in] usTopicFilterLength Length of pcTopicFilter.\r
130  * @param[in] xCallback MQTT v1 callback to store.\r
131  * @param[in] pvParameter Parameter to xCallback.\r
132  *\r
133  * @return pdPASS if the callback was successfully stored; pdFAIL otherwise.\r
134  */\r
135     static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,\r
136                                         const char * const pcTopicFilter,\r
137                                         uint16_t usTopicFilterLength,\r
138                                         MQTTPublishCallback_t xCallback,\r
139                                         void * pvParameter );\r
140 \r
141 /**\r
142  * @brief Search the callback conversion table for the given topic filter.\r
143  *\r
144  * @param[in] pxConnection The connection containing the conversion table.\r
145  * @param[in] pcTopicFilter The topic filter to search for.\r
146  * @param[in] usTopicFilterLength The length of pcTopicFilter.\r
147  *\r
148  * @return A pointer to the callback entry if found; NULL otherwise.\r
149  * @note This function should be called with pxConnection->xConnectionMutex\r
150  * locked.\r
151  */\r
152     static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,\r
153                                              const char * const pcTopicFilter,\r
154                                              uint16_t usTopicFilterLength );\r
155 \r
156 /**\r
157  * @brief Remove a topic filter from the callback conversion table.\r
158  *\r
159  * @param[in] pxConnection The connection containing the conversion table.\r
160  * @param[in] pcTopicFilter The topic filter to remove.\r
161  * @param[in] usTopicFilterLength The length of pcTopic.\r
162  */\r
163     static void prvRemoveCallback( MQTTConnection_t * const pxConnection,\r
164                                    const char * const pcTopicFilter,\r
165                                    uint16_t usTopicFilterLength );\r
166 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */\r
167 \r
168 /*-----------------------------------------------------------*/\r
169 \r
170 /**\r
171  * @brief The number of available MQTT brokers, controlled by the constant\r
172  * mqttconfigMAX_BROKERS;\r
173  */\r
174 static UBaseType_t uxAvailableBrokers = mqttconfigMAX_BROKERS;\r
175 \r
176 /*-----------------------------------------------------------*/\r
177 \r
178 static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus )\r
179 {\r
180     MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;\r
181 \r
182     switch( xMqttStatus )\r
183     {\r
184         case IOT_MQTT_SUCCESS:\r
185         case IOT_MQTT_STATUS_PENDING:\r
186             xStatus = eMQTTAgentSuccess;\r
187             break;\r
188 \r
189         case IOT_MQTT_TIMEOUT:\r
190             xStatus = eMQTTAgentTimeout;\r
191             break;\r
192 \r
193         default:\r
194             xStatus = eMQTTAgentFailure;\r
195             break;\r
196     }\r
197 \r
198     return xStatus;\r
199 }\r
200 \r
201 /*-----------------------------------------------------------*/\r
202 \r
203 static void prvPublishCallbackWrapper( void * pvParameter,\r
204                                        IotMqttCallbackParam_t * const pxPublish )\r
205 {\r
206     BaseType_t xStatus = pdPASS;\r
207     size_t xBufferSize = 0;\r
208     uint8_t * pucMqttBuffer = NULL;\r
209     MQTTBool_t xCallbackReturn = eMQTTFalse;\r
210     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;\r
211     MQTTAgentCallbackParams_t xPublishData = { .xMQTTEvent = eMQTTAgentPublish };\r
212 \r
213     /* Calculate the size of the MQTT buffer that must be allocated. */\r
214     if( xStatus == pdPASS )\r
215     {\r
216         xBufferSize = pxPublish->u.message.info.topicNameLength +\r
217                       pxPublish->u.message.info.payloadLength;\r
218 \r
219         /* Check for overflow. */\r
220         if( ( xBufferSize < pxPublish->u.message.info.topicNameLength ) ||\r
221             ( xBufferSize < pxPublish->u.message.info.payloadLength ) )\r
222         {\r
223             mqttconfigDEBUG_LOG( ( "Incoming PUBLISH message and topic name length too large.\r\n" ) );\r
224             xStatus = pdFAIL;\r
225         }\r
226     }\r
227 \r
228     /* Allocate an MQTT buffer for the callback. */\r
229     if( xStatus == pdPASS )\r
230     {\r
231         pucMqttBuffer = pvPortMalloc( xBufferSize );\r
232 \r
233         if( pucMqttBuffer == NULL )\r
234         {\r
235             mqttconfigDEBUG_LOG( ( "Failed to allocate memory for MQTT buffer.\r\n" ) );\r
236             xStatus = pdFAIL;\r
237         }\r
238         else\r
239         {\r
240             /* Copy the topic name and payload. The topic name and payload must be\r
241              * copied in case the user decides to take ownership of the MQTT buffer.\r
242              * The original buffer containing the MQTT topic name and payload may\r
243              * contain further unprocessed packets and must remain property of the\r
244              * MQTT library. Therefore, the topic name and payload are copied into\r
245              * another buffer for the user. */\r
246             ( void ) memcpy( pucMqttBuffer,\r
247                              pxPublish->u.message.info.pTopicName,\r
248                              pxPublish->u.message.info.topicNameLength );\r
249             ( void ) memcpy( pucMqttBuffer + pxPublish->u.message.info.topicNameLength,\r
250                              pxPublish->u.message.info.pPayload,\r
251                              pxPublish->u.message.info.payloadLength );\r
252 \r
253             /* Set the members of the callback parameter. */\r
254             xPublishData.xMQTTEvent = eMQTTAgentPublish;\r
255             xPublishData.u.xPublishData.pucTopic = pucMqttBuffer;\r
256             xPublishData.u.xPublishData.usTopicLength = pxPublish->u.message.info.topicNameLength;\r
257             xPublishData.u.xPublishData.pvData = pucMqttBuffer + pxPublish->u.message.info.topicNameLength;\r
258             xPublishData.u.xPublishData.ulDataLength = ( uint32_t ) pxPublish->u.message.info.payloadLength;\r
259             xPublishData.u.xPublishData.xQos = ( MQTTQoS_t ) pxPublish->u.message.info.qos;\r
260             xPublishData.u.xPublishData.xBuffer = pucMqttBuffer;\r
261         }\r
262     }\r
263 \r
264     if( xStatus == pdPASS )\r
265     {\r
266         #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
267             /* When subscription management is enabled, search for a matching subscription. */\r
268             MQTTCallback_t * pxCallbackEntry = prvFindCallback( pxConnection,\r
269                                                                 pxPublish->u.message.pTopicFilter,\r
270                                                                 pxPublish->u.message.topicFilterLength );\r
271 \r
272             /* Check if a matching MQTT v1 subscription was found. */\r
273             if( pxCallbackEntry != NULL )\r
274             {\r
275                 /* Invoke the topic-specific callback if it exists. */\r
276                 if( pxCallbackEntry->xFunction != NULL )\r
277                 {\r
278                     xCallbackReturn = pxCallbackEntry->xFunction( pxCallbackEntry->pvParameter,\r
279                                                                   &( xPublishData.u.xPublishData ) );\r
280                 }\r
281                 else\r
282                 {\r
283                     /* Otherwise, invoke the global callback. */\r
284                     if( pxConnection->pxCallback != NULL )\r
285                     {\r
286                         xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,\r
287                                                                                    &xPublishData );\r
288                     }\r
289                 }\r
290             }\r
291         #else /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */\r
292 \r
293             /* When subscription management is disabled, invoke the global callback\r
294              * if one exists. */\r
295 \r
296             /* When subscription management is disabled, the topic filter must be "#". */\r
297             mqttconfigASSERT( *( xPublish.message.pTopicFilter ) == '#' );\r
298             mqttconfigASSERT( xPublish.message.topicFilterLength == 1 );\r
299 \r
300             if( pxConnection->pxCallback != NULL )\r
301             {\r
302                 xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,\r
303                                                                            &xPublishData );\r
304             }\r
305         #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */\r
306     }\r
307 \r
308     /* Free the MQTT buffer if the user did not take ownership of it. */\r
309     if( ( xCallbackReturn == eMQTTFalse ) && ( pucMqttBuffer != NULL ) )\r
310     {\r
311         vPortFree( pucMqttBuffer );\r
312     }\r
313 }\r
314 \r
315 /*-----------------------------------------------------------*/\r
316 \r
317 static void prvDisconnectCallbackWrapper( void * pvParameter,\r
318                                           IotMqttCallbackParam_t * pxDisconnect )\r
319 {\r
320     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;\r
321     MQTTAgentCallbackParams_t xCallbackParams = { .xMQTTEvent = eMQTTAgentDisconnect };\r
322 \r
323     ( void ) pxDisconnect;\r
324 \r
325     /* This function should only be called if a callback was set. */\r
326     mqttconfigASSERT( pxConnection->pxCallback != NULL );\r
327 \r
328     /* Invoke the MQTT v1 callback. Ignore the return value. */\r
329     pxConnection->pxCallback( pxConnection->pvUserData,\r
330                               &xCallbackParams );\r
331 }\r
332 \r
333 /*-----------------------------------------------------------*/\r
334 \r
335 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
336     static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,\r
337                                         const char * const pcTopicFilter,\r
338                                         uint16_t usTopicFilterLength,\r
339                                         MQTTPublishCallback_t xCallback,\r
340                                         void * pvParameter )\r
341     {\r
342         MQTTCallback_t * pxCallback = NULL;\r
343         BaseType_t xStatus = pdFAIL, i = 0;\r
344 \r
345         /* Prevent other tasks from modifying stored callbacks while this function\r
346          * runs. */\r
347         if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),\r
348                             portMAX_DELAY ) == pdTRUE )\r
349         {\r
350             /* Check if the topic filter already has an entry. */\r
351             pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );\r
352 \r
353             if( pxCallback == NULL )\r
354             {\r
355                 /* If no entry was found, find a free entry. */\r
356                 for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )\r
357                 {\r
358                     if( pxConnection->xCallbacks[ i ].xInUse == pdFALSE )\r
359                     {\r
360                         pxConnection->xCallbacks[ i ].xInUse = pdTRUE;\r
361                         pxCallback = &( pxConnection->xCallbacks[ i ] );\r
362                         break;\r
363                     }\r
364                 }\r
365             }\r
366 \r
367             /* Set the members of the callback entry. */\r
368             if( i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS )\r
369             {\r
370                 pxCallback->pvParameter = pvParameter;\r
371                 pxCallback->usTopicFilterLength = usTopicFilterLength;\r
372                 pxCallback->xFunction = xCallback;\r
373                 ( void ) strncpy( pxCallback->pcTopicFilter, pcTopicFilter, usTopicFilterLength );\r
374                 xStatus = pdPASS;\r
375             }\r
376 \r
377             ( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );\r
378         }\r
379 \r
380         return xStatus;\r
381     }\r
382 \r
383 /*-----------------------------------------------------------*/\r
384 \r
385     static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,\r
386                                              const char * const pcTopicFilter,\r
387                                              uint16_t usTopicFilterLength )\r
388     {\r
389         BaseType_t i = 0;\r
390         MQTTCallback_t * pxResult = NULL;\r
391 \r
392         /* Search the callback conversion table for the topic filter. */\r
393         for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )\r
394         {\r
395             if( ( pxConnection->xCallbacks[ i ].usTopicFilterLength == usTopicFilterLength ) &&\r
396                 ( strncmp( pxConnection->xCallbacks[ i ].pcTopicFilter,\r
397                            pcTopicFilter,\r
398                            usTopicFilterLength ) == 0 ) )\r
399             {\r
400                 pxResult = &( pxConnection->xCallbacks[ i ] );\r
401                 break;\r
402             }\r
403         }\r
404 \r
405         return pxResult;\r
406     }\r
407 \r
408 /*-----------------------------------------------------------*/\r
409 \r
410     static void prvRemoveCallback( MQTTConnection_t * const pxConnection,\r
411                                    const char * const pcTopicFilter,\r
412                                    uint16_t usTopicFilterLength )\r
413     {\r
414         MQTTCallback_t * pxCallback = NULL;\r
415 \r
416         /* Prevent other tasks from modifying stored callbacks while this function\r
417          * runs. */\r
418         if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),\r
419                             portMAX_DELAY ) == pdTRUE )\r
420         {\r
421             /* Find the given topic filter. */\r
422             pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );\r
423 \r
424             if( pxCallback != NULL )\r
425             {\r
426                 /* Clear the callback entry. */\r
427                 mqttconfigASSERT( pxCallback->xInUse == pdTRUE );\r
428                 ( void ) memset( pxCallback, 0x00, sizeof( MQTTCallback_t ) );\r
429             }\r
430 \r
431             ( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );\r
432         }\r
433     }\r
434 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */\r
435 \r
436 /*-----------------------------------------------------------*/\r
437 \r
438 IotMqttConnection_t MQTT_AGENT_Getv2Connection( MQTTAgentHandle_t xMQTTHandle )\r
439 {\r
440     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
441 \r
442     return pxConnection->xMQTTConnection;\r
443 }\r
444 \r
445 /*-----------------------------------------------------------*/\r
446 \r
447 BaseType_t MQTT_AGENT_Init( void )\r
448 {\r
449     BaseType_t xStatus = pdFALSE;\r
450 \r
451     /* Call the initialization function of MQTT v2. */\r
452     if( IotMqtt_Init() == IOT_MQTT_SUCCESS )\r
453     {\r
454         xStatus = pdTRUE;\r
455     }\r
456 \r
457     return xStatus;\r
458 }\r
459 \r
460 /*-----------------------------------------------------------*/\r
461 \r
462 MQTTAgentReturnCode_t MQTT_AGENT_Create( MQTTAgentHandle_t * const pxMQTTHandle )\r
463 {\r
464     MQTTConnection_t * pxNewConnection = NULL;\r
465     MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;\r
466 \r
467     /* Check how many brokers are available; fail if all brokers are in use. */\r
468     taskENTER_CRITICAL();\r
469     {\r
470         if( uxAvailableBrokers == 0 )\r
471         {\r
472             xStatus = eMQTTAgentFailure;\r
473         }\r
474         else\r
475         {\r
476             uxAvailableBrokers--;\r
477             mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );\r
478         }\r
479     }\r
480     taskEXIT_CRITICAL();\r
481 \r
482     /* Allocate memory for an MQTT connection. */\r
483     if( xStatus == eMQTTAgentSuccess )\r
484     {\r
485         pxNewConnection = pvPortMalloc( sizeof( MQTTConnection_t ) );\r
486 \r
487         if( pxNewConnection == NULL )\r
488         {\r
489             xStatus = eMQTTAgentFailure;\r
490 \r
491             taskENTER_CRITICAL();\r
492             {\r
493                 uxAvailableBrokers++;\r
494                 mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );\r
495             }\r
496             taskEXIT_CRITICAL();\r
497         }\r
498         else\r
499         {\r
500             ( void ) memset( pxNewConnection, 0x00, sizeof( MQTTConnection_t ) );\r
501             pxNewConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;\r
502         }\r
503     }\r
504 \r
505     /* Create the connection mutex and set the output parameter. */\r
506     if( xStatus == eMQTTAgentSuccess )\r
507     {\r
508         ( void ) xSemaphoreCreateMutexStatic( &( pxNewConnection->xConnectionMutex ) );\r
509         *pxMQTTHandle = ( MQTTAgentHandle_t ) pxNewConnection;\r
510     }\r
511 \r
512     return xStatus;\r
513 }\r
514 \r
515 /*-----------------------------------------------------------*/\r
516 \r
517 MQTTAgentReturnCode_t MQTT_AGENT_Delete( MQTTAgentHandle_t xMQTTHandle )\r
518 {\r
519     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
520 \r
521     /* Clean up any allocated MQTT or network resources. */\r
522     if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )\r
523     {\r
524         IotMqtt_Disconnect( pxConnection->xMQTTConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );\r
525         pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;\r
526     }\r
527 \r
528     /* Free memory used by the MQTT connection. */\r
529     vPortFree( pxConnection );\r
530 \r
531     /* Increment the number of available brokers. */\r
532     taskENTER_CRITICAL();\r
533     {\r
534         uxAvailableBrokers++;\r
535         mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );\r
536     }\r
537     taskEXIT_CRITICAL();\r
538 \r
539     return eMQTTAgentSuccess;\r
540 }\r
541 \r
542 /*-----------------------------------------------------------*/\r
543 \r
544 MQTTAgentReturnCode_t MQTT_AGENT_Connect( MQTTAgentHandle_t xMQTTHandle,\r
545                                           const MQTTAgentConnectParams_t * const pxConnectParams,\r
546                                           TickType_t xTimeoutTicks )\r
547 {\r
548     MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;\r
549     IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;\r
550     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
551     IotNetworkServerInfo_t xServerInfo = { 0 };\r
552     IotNetworkCredentials_t xCredentials = AWS_IOT_NETWORK_CREDENTIALS_AFR_INITIALIZER, * pxCredentials = NULL;\r
553     IotMqttNetworkInfo_t xNetworkInfo = IOT_MQTT_NETWORK_INFO_INITIALIZER;\r
554     IotMqttConnectInfo_t xMqttConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;\r
555 \r
556     /* Copy the global callback and parameter. */\r
557     pxConnection->pxCallback = pxConnectParams->pxCallback;\r
558     pxConnection->pvUserData = pxConnectParams->pvUserData;\r
559 \r
560     /* Set the TLS info for a secured connection. */\r
561     if( ( pxConnectParams->xSecuredConnection == pdTRUE ) ||\r
562         ( ( pxConnectParams->xFlags & mqttagentREQUIRE_TLS ) == mqttagentREQUIRE_TLS ) )\r
563     {\r
564         pxCredentials = &xCredentials;\r
565 \r
566         /* Set the server certificate. Other credentials are set by the initializer. */\r
567         xCredentials.pRootCa = pxConnectParams->pcCertificate;\r
568         xCredentials.rootCaSize = ( size_t ) pxConnectParams->ulCertificateSize;\r
569 \r
570         /* Disable ALPN if requested. */\r
571         if( ( pxConnectParams->xFlags & mqttagentUSE_AWS_IOT_ALPN_443 ) == 0 )\r
572         {\r
573             xCredentials.pAlpnProtos = NULL;\r
574         }\r
575 \r
576         /* Disable SNI if requested. */\r
577         if( ( pxConnectParams->xURLIsIPAddress == pdTRUE ) ||\r
578             ( ( pxConnectParams->xFlags & mqttagentURL_IS_IP_ADDRESS ) == mqttagentURL_IS_IP_ADDRESS ) )\r
579         {\r
580             xCredentials.disableSni = true;\r
581         }\r
582     }\r
583 \r
584     /* Set the server info. */\r
585     xServerInfo.pHostName = pxConnectParams->pcURL;\r
586     xServerInfo.port = pxConnectParams->usPort;\r
587 \r
588     /* Set the members of the network info. */\r
589     xNetworkInfo.createNetworkConnection = true;\r
590     xNetworkInfo.u.setup.pNetworkServerInfo = &xServerInfo;\r
591     xNetworkInfo.u.setup.pNetworkCredentialInfo = pxCredentials;\r
592     xNetworkInfo.pNetworkInterface = IOT_NETWORK_INTERFACE_AFR;\r
593 \r
594     if( pxConnectParams->pxCallback != NULL )\r
595     {\r
596         xNetworkInfo.disconnectCallback.function = prvDisconnectCallbackWrapper;\r
597         xNetworkInfo.disconnectCallback.pCallbackContext = pxConnection;\r
598     }\r
599 \r
600     /* Set the members of the MQTT connect info. */\r
601     xMqttConnectInfo.awsIotMqttMode = true;\r
602     xMqttConnectInfo.cleanSession = true;\r
603     xMqttConnectInfo.pClientIdentifier = ( const char * ) ( pxConnectParams->pucClientId );\r
604     xMqttConnectInfo.clientIdentifierLength = pxConnectParams->usClientIdLength;\r
605     xMqttConnectInfo.keepAliveSeconds = mqttconfigKEEP_ALIVE_INTERVAL_SECONDS;\r
606 \r
607     /* Call MQTT v2's CONNECT function. */\r
608     xMqttStatus = IotMqtt_Connect( &xNetworkInfo,\r
609                                    &xMqttConnectInfo,\r
610                                    mqttTICKS_TO_MS( xTimeoutTicks ),\r
611                                    &( pxConnection->xMQTTConnection ) );\r
612     xStatus = prvConvertReturnCode( xMqttStatus );\r
613 \r
614     /* Add a subscription to "#" to support the global callback when subscription\r
615      * manager is disabled. */\r
616     #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 0 )\r
617         IotMqttSubscription_t xGlobalSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;\r
618         IotMqttReference_t xGlobalSubscriptionRef = IOT_MQTT_REFERENCE_INITIALIZER;\r
619 \r
620         if( xStatus == eMQTTAgentSuccess )\r
621         {\r
622             xGlobalSubscription.pTopicFilter = "#";\r
623             xGlobalSubscription.topicFilterLength = 1;\r
624             xGlobalSubscription.qos = 0;\r
625             xGlobalSubscription.callback.param1 = pxConnection;\r
626             xGlobalSubscription.callback.function = prvPublishCallbackWrapper;\r
627 \r
628             xMqttStatus = IotMqtt_Subscribe( pxConnection->xMQTTConnection,\r
629                                              &xGlobalSubscription,\r
630                                              1,\r
631                                              IOT_MQTT_FLAG_WAITABLE,\r
632                                              NULL,\r
633                                              &xGlobalSubscriptionRef );\r
634             xStatus = prvConvertReturnCode( xMqttStatus );\r
635         }\r
636 \r
637         /* Wait for the subscription to "#" to complete. */\r
638         if( xStatus == eMQTTAgentSuccess )\r
639         {\r
640             xMqttStatus = IotMqtt_Wait( xGlobalSubscriptionRef,\r
641                                         mqttTICKS_TO_MS( xTimeoutTicks ) );\r
642             xStatus = prvConvertReturnCode( xMqttStatus );\r
643         }\r
644     #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */\r
645 \r
646     return xStatus;\r
647 }\r
648 \r
649 /*-----------------------------------------------------------*/\r
650 \r
651 MQTTAgentReturnCode_t MQTT_AGENT_Disconnect( MQTTAgentHandle_t xMQTTHandle,\r
652                                              TickType_t xTimeoutTicks )\r
653 {\r
654     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
655 \r
656     /* MQTT v2's DISCONNECT function does not have a timeout argument. */\r
657     ( void ) xTimeoutTicks;\r
658 \r
659     /* Check that the connection is established. */\r
660     if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )\r
661     {\r
662         /* Call MQTT v2's DISCONNECT function. */\r
663         IotMqtt_Disconnect( pxConnection->xMQTTConnection,\r
664                             0 );\r
665         pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;\r
666     }\r
667 \r
668     return eMQTTAgentSuccess;\r
669 }\r
670 \r
671 /*-----------------------------------------------------------*/\r
672 \r
673 MQTTAgentReturnCode_t MQTT_AGENT_Subscribe( MQTTAgentHandle_t xMQTTHandle,\r
674                                             const MQTTAgentSubscribeParams_t * const pxSubscribeParams,\r
675                                             TickType_t xTimeoutTicks )\r
676 {\r
677     MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;\r
678     IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;\r
679     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
680     IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;\r
681 \r
682     /* Store the topic filter if subscription management is enabled. */\r
683     #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
684         /* Check topic filter length. */\r
685         if( pxSubscribeParams->usTopicLength > mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH )\r
686         {\r
687             xStatus = eMQTTAgentFailure;\r
688         }\r
689 \r
690         /* Store the subscription. */\r
691         if( prvStoreCallback( pxConnection,\r
692                               ( const char * ) pxSubscribeParams->pucTopic,\r
693                               pxSubscribeParams->usTopicLength,\r
694                               pxSubscribeParams->pxPublishCallback,\r
695                               pxSubscribeParams->pvPublishCallbackContext ) == pdFAIL )\r
696         {\r
697             xStatus = eMQTTAgentFailure;\r
698         }\r
699     #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */\r
700 \r
701     /* Call MQTT v2 blocking SUBSCRIBE function. */\r
702     if( xStatus == eMQTTAgentSuccess )\r
703     {\r
704         /* Set the members of the MQTT subscription. */\r
705         xSubscription.pTopicFilter = ( const char * ) ( pxSubscribeParams->pucTopic );\r
706         xSubscription.topicFilterLength = pxSubscribeParams->usTopicLength;\r
707         xSubscription.qos = ( IotMqttQos_t ) pxSubscribeParams->xQoS;\r
708         xSubscription.callback.pCallbackContext = pxConnection;\r
709         xSubscription.callback.function = prvPublishCallbackWrapper;\r
710 \r
711         xMqttStatus = IotMqtt_TimedSubscribe( pxConnection->xMQTTConnection,\r
712                                               &xSubscription,\r
713                                               1,\r
714                                               0,\r
715                                               mqttTICKS_TO_MS( xTimeoutTicks ) );\r
716         xStatus = prvConvertReturnCode( xMqttStatus );\r
717     }\r
718 \r
719     return xStatus;\r
720 }\r
721 \r
722 /*-----------------------------------------------------------*/\r
723 \r
724 MQTTAgentReturnCode_t MQTT_AGENT_Unsubscribe( MQTTAgentHandle_t xMQTTHandle,\r
725                                               const MQTTAgentUnsubscribeParams_t * const pxUnsubscribeParams,\r
726                                               TickType_t xTimeoutTicks )\r
727 {\r
728     IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;\r
729     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
730     IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;\r
731 \r
732     /* Remove any subscription callback that may be registered. */\r
733     #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )\r
734         prvRemoveCallback( pxConnection,\r
735                            ( const char * ) ( pxUnsubscribeParams->pucTopic ),\r
736                            pxUnsubscribeParams->usTopicLength );\r
737     #endif\r
738 \r
739     /* Set the members of the subscription to remove. */\r
740     xSubscription.pTopicFilter = ( const char * ) ( pxUnsubscribeParams->pucTopic );\r
741     xSubscription.topicFilterLength = pxUnsubscribeParams->usTopicLength;\r
742     xSubscription.callback.pCallbackContext = pxConnection;\r
743     xSubscription.callback.function = prvPublishCallbackWrapper;\r
744 \r
745     /* Call MQTT v2 blocking UNSUBSCRIBE function. */\r
746     xMqttStatus = IotMqtt_TimedUnsubscribe( pxConnection->xMQTTConnection,\r
747                                             &xSubscription,\r
748                                             1,\r
749                                             0,\r
750                                             mqttTICKS_TO_MS( xTimeoutTicks ) );\r
751 \r
752     return prvConvertReturnCode( xMqttStatus );\r
753 }\r
754 \r
755 /*-----------------------------------------------------------*/\r
756 \r
757 MQTTAgentReturnCode_t MQTT_AGENT_Publish( MQTTAgentHandle_t xMQTTHandle,\r
758                                           const MQTTAgentPublishParams_t * const pxPublishParams,\r
759                                           TickType_t xTimeoutTicks )\r
760 {\r
761     IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;\r
762     MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;\r
763     IotMqttPublishInfo_t xPublishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;\r
764 \r
765     /* Set the members of the publish info. */\r
766     xPublishInfo.pTopicName = ( const char * ) pxPublishParams->pucTopic;\r
767     xPublishInfo.topicNameLength = pxPublishParams->usTopicLength;\r
768     xPublishInfo.qos = ( IotMqttQos_t ) pxPublishParams->xQoS;\r
769     xPublishInfo.pPayload = ( const void * ) pxPublishParams->pvData;\r
770     xPublishInfo.payloadLength = pxPublishParams->ulDataLength;\r
771 \r
772     /* Call the MQTT v2 blocking PUBLISH function. */\r
773     xMqttStatus = IotMqtt_TimedPublish( pxConnection->xMQTTConnection,\r
774                                         &xPublishInfo,\r
775                                         0,\r
776                                         mqttTICKS_TO_MS( xTimeoutTicks ) );\r
777 \r
778     return prvConvertReturnCode( xMqttStatus );\r
779 }\r
780 \r
781 /*-----------------------------------------------------------*/\r
782 \r
783 MQTTAgentReturnCode_t MQTT_AGENT_ReturnBuffer( MQTTAgentHandle_t xMQTTHandle,\r
784                                                MQTTBufferHandle_t xBufferHandle )\r
785 {\r
786     ( void ) xMQTTHandle;\r
787 \r
788     /* Free the MQTT buffer. */\r
789     vPortFree( xBufferHandle );\r
790 \r
791     return eMQTTAgentSuccess;\r
792 }\r
793 \r
794 /*-----------------------------------------------------------*/\r