2 * Amazon FreeRTOS MQTT V2.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
27 * @file iot_mqtt_agent.c
\r
28 * @brief MQTT Agent implementation. Provides backwards compatibility between
\r
29 * MQTT v2 and MQTT v1.
\r
32 /* The config header is always included first. */
\r
33 #include "iot_config.h"
\r
35 /* Standard includes. */
\r
38 /* FreeRTOS includes. */
\r
39 #include "FreeRTOS.h"
\r
42 /* MQTT v1 includes. */
\r
43 #include "iot_mqtt_agent.h"
\r
44 #include "iot_mqtt_agent_config.h"
\r
45 #include "iot_mqtt_agent_config_defaults.h"
\r
47 /* MQTT v2 include. */
\r
48 #include "iot_mqtt.h"
\r
50 /* Platform network include. */
\r
51 #include "platform/iot_network_freertos.h"
\r
53 /*-----------------------------------------------------------*/
\r
56 * @brief Converts FreeRTOS ticks to milliseconds.
\r
58 #define mqttTICKS_TO_MS( xTicks ) ( xTicks * 1000 / configTICK_RATE_HZ )
\r
60 /*-----------------------------------------------------------*/
\r
63 * @brief Stores data to convert between the MQTT v1 subscription callback
\r
64 * and the MQTT v2 subscription callback.
\r
66 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
67 typedef struct MQTTCallback
\r
69 BaseType_t xInUse; /**< Whether this instance is in-use. */
\r
70 MQTTPublishCallback_t xFunction; /**< MQTT v1 callback function. */
\r
71 void * pvParameter; /**< Parameter to xFunction. */
\r
73 uint16_t usTopicFilterLength; /**< Length of pcTopicFilter. */
\r
74 char pcTopicFilter[ mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH ]; /**< Topic filter. */
\r
79 * @brief Stores data on an active MQTT connection.
\r
81 typedef struct MQTTConnection
\r
83 IotMqttConnection_t xMQTTConnection; /**< MQTT v2 connection handle. */
\r
84 MQTTAgentCallback_t pxCallback; /**< MQTT v1 global callback. */
\r
85 void * pvUserData; /**< Parameter to pxCallback. */
\r
86 StaticSemaphore_t xConnectionMutex; /**< Protects from concurrent accesses. */
\r
87 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
88 MQTTCallback_t xCallbacks /**< Conversion table of MQTT v1 to MQTT v2 subscription callbacks. */
\r
89 [ mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS ];
\r
93 /*-----------------------------------------------------------*/
\r
96 * @brief Convert an MQTT v2 return code to an MQTT v1 return code.
\r
98 * @param[in] xMqttStatus The MQTT v2 return code.
\r
100 * @return An equivalent MQTT v1 return code.
\r
102 static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus );
\r
105 * @brief Wraps an MQTT v1 publish callback.
\r
107 * @param[in] pvParameter The MQTT connection.
\r
108 * @param[in] pxPublish Information about the incoming publish.
\r
110 static void prvPublishCallbackWrapper( void * pvParameter,
\r
111 IotMqttCallbackParam_t * const pxPublish );
\r
114 * @brief Wraps an MQTT v1 disconnect callback.
\r
116 * @param[in] pvCallbackContext The MQTT connection.
\r
117 * @param[in] pxDisconnect Information about the disconnect.
\r
119 static void prvDisconnectCallbackWrapper( void * pvParameter,
\r
120 IotMqttCallbackParam_t * pxDisconnect );
\r
122 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
125 * @brief Store an MQTT v1 callback in the conversion table.
\r
127 * @param[in] pxConnection Where to store the callback.
\r
128 * @param[in] pcTopicFilter Topic filter to store.
\r
129 * @param[in] usTopicFilterLength Length of pcTopicFilter.
\r
130 * @param[in] xCallback MQTT v1 callback to store.
\r
131 * @param[in] pvParameter Parameter to xCallback.
\r
133 * @return pdPASS if the callback was successfully stored; pdFAIL otherwise.
\r
135 static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,
\r
136 const char * const pcTopicFilter,
\r
137 uint16_t usTopicFilterLength,
\r
138 MQTTPublishCallback_t xCallback,
\r
139 void * pvParameter );
\r
142 * @brief Search the callback conversion table for the given topic filter.
\r
144 * @param[in] pxConnection The connection containing the conversion table.
\r
145 * @param[in] pcTopicFilter The topic filter to search for.
\r
146 * @param[in] usTopicFilterLength The length of pcTopicFilter.
\r
148 * @return A pointer to the callback entry if found; NULL otherwise.
\r
149 * @note This function should be called with pxConnection->xConnectionMutex
\r
152 static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,
\r
153 const char * const pcTopicFilter,
\r
154 uint16_t usTopicFilterLength );
\r
157 * @brief Remove a topic filter from the callback conversion table.
\r
159 * @param[in] pxConnection The connection containing the conversion table.
\r
160 * @param[in] pcTopicFilter The topic filter to remove.
\r
161 * @param[in] usTopicFilterLength The length of pcTopic.
\r
163 static void prvRemoveCallback( MQTTConnection_t * const pxConnection,
\r
164 const char * const pcTopicFilter,
\r
165 uint16_t usTopicFilterLength );
\r
166 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
\r
168 /*-----------------------------------------------------------*/
\r
171 * @brief The number of available MQTT brokers, controlled by the constant
\r
172 * mqttconfigMAX_BROKERS;
\r
174 static UBaseType_t uxAvailableBrokers = mqttconfigMAX_BROKERS;
\r
176 /*-----------------------------------------------------------*/
\r
178 static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus )
\r
180 MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
\r
182 switch( xMqttStatus )
\r
184 case IOT_MQTT_SUCCESS:
\r
185 case IOT_MQTT_STATUS_PENDING:
\r
186 xStatus = eMQTTAgentSuccess;
\r
189 case IOT_MQTT_TIMEOUT:
\r
190 xStatus = eMQTTAgentTimeout;
\r
194 xStatus = eMQTTAgentFailure;
\r
201 /*-----------------------------------------------------------*/
\r
203 static void prvPublishCallbackWrapper( void * pvParameter,
\r
204 IotMqttCallbackParam_t * const pxPublish )
\r
206 BaseType_t xStatus = pdPASS;
\r
207 size_t xBufferSize = 0;
\r
208 uint8_t * pucMqttBuffer = NULL;
\r
209 MQTTBool_t xCallbackReturn = eMQTTFalse;
\r
210 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;
\r
211 MQTTAgentCallbackParams_t xPublishData = { .xMQTTEvent = eMQTTAgentPublish };
\r
213 /* Calculate the size of the MQTT buffer that must be allocated. */
\r
214 if( xStatus == pdPASS )
\r
216 xBufferSize = pxPublish->u.message.info.topicNameLength +
\r
217 pxPublish->u.message.info.payloadLength;
\r
219 /* Check for overflow. */
\r
220 if( ( xBufferSize < pxPublish->u.message.info.topicNameLength ) ||
\r
221 ( xBufferSize < pxPublish->u.message.info.payloadLength ) )
\r
223 mqttconfigDEBUG_LOG( ( "Incoming PUBLISH message and topic name length too large.\r\n" ) );
\r
228 /* Allocate an MQTT buffer for the callback. */
\r
229 if( xStatus == pdPASS )
\r
231 pucMqttBuffer = pvPortMalloc( xBufferSize );
\r
233 if( pucMqttBuffer == NULL )
\r
235 mqttconfigDEBUG_LOG( ( "Failed to allocate memory for MQTT buffer.\r\n" ) );
\r
240 /* Copy the topic name and payload. The topic name and payload must be
\r
241 * copied in case the user decides to take ownership of the MQTT buffer.
\r
242 * The original buffer containing the MQTT topic name and payload may
\r
243 * contain further unprocessed packets and must remain property of the
\r
244 * MQTT library. Therefore, the topic name and payload are copied into
\r
245 * another buffer for the user. */
\r
246 ( void ) memcpy( pucMqttBuffer,
\r
247 pxPublish->u.message.info.pTopicName,
\r
248 pxPublish->u.message.info.topicNameLength );
\r
249 ( void ) memcpy( pucMqttBuffer + pxPublish->u.message.info.topicNameLength,
\r
250 pxPublish->u.message.info.pPayload,
\r
251 pxPublish->u.message.info.payloadLength );
\r
253 /* Set the members of the callback parameter. */
\r
254 xPublishData.xMQTTEvent = eMQTTAgentPublish;
\r
255 xPublishData.u.xPublishData.pucTopic = pucMqttBuffer;
\r
256 xPublishData.u.xPublishData.usTopicLength = pxPublish->u.message.info.topicNameLength;
\r
257 xPublishData.u.xPublishData.pvData = pucMqttBuffer + pxPublish->u.message.info.topicNameLength;
\r
258 xPublishData.u.xPublishData.ulDataLength = ( uint32_t ) pxPublish->u.message.info.payloadLength;
\r
259 xPublishData.u.xPublishData.xQos = ( MQTTQoS_t ) pxPublish->u.message.info.qos;
\r
260 xPublishData.u.xPublishData.xBuffer = pucMqttBuffer;
\r
264 if( xStatus == pdPASS )
\r
266 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
267 /* When subscription management is enabled, search for a matching subscription. */
\r
268 MQTTCallback_t * pxCallbackEntry = prvFindCallback( pxConnection,
\r
269 pxPublish->u.message.pTopicFilter,
\r
270 pxPublish->u.message.topicFilterLength );
\r
272 /* Check if a matching MQTT v1 subscription was found. */
\r
273 if( pxCallbackEntry != NULL )
\r
275 /* Invoke the topic-specific callback if it exists. */
\r
276 if( pxCallbackEntry->xFunction != NULL )
\r
278 xCallbackReturn = pxCallbackEntry->xFunction( pxCallbackEntry->pvParameter,
\r
279 &( xPublishData.u.xPublishData ) );
\r
283 /* Otherwise, invoke the global callback. */
\r
284 if( pxConnection->pxCallback != NULL )
\r
286 xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,
\r
291 #else /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
\r
293 /* When subscription management is disabled, invoke the global callback
\r
294 * if one exists. */
\r
296 /* When subscription management is disabled, the topic filter must be "#". */
\r
297 mqttconfigASSERT( *( xPublish.message.pTopicFilter ) == '#' );
\r
298 mqttconfigASSERT( xPublish.message.topicFilterLength == 1 );
\r
300 if( pxConnection->pxCallback != NULL )
\r
302 xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,
\r
305 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
\r
308 /* Free the MQTT buffer if the user did not take ownership of it. */
\r
309 if( ( xCallbackReturn == eMQTTFalse ) && ( pucMqttBuffer != NULL ) )
\r
311 vPortFree( pucMqttBuffer );
\r
315 /*-----------------------------------------------------------*/
\r
317 static void prvDisconnectCallbackWrapper( void * pvParameter,
\r
318 IotMqttCallbackParam_t * pxDisconnect )
\r
320 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;
\r
321 MQTTAgentCallbackParams_t xCallbackParams = { .xMQTTEvent = eMQTTAgentDisconnect };
\r
323 ( void ) pxDisconnect;
\r
325 /* This function should only be called if a callback was set. */
\r
326 mqttconfigASSERT( pxConnection->pxCallback != NULL );
\r
328 /* Invoke the MQTT v1 callback. Ignore the return value. */
\r
329 pxConnection->pxCallback( pxConnection->pvUserData,
\r
330 &xCallbackParams );
\r
333 /*-----------------------------------------------------------*/
\r
335 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
336 static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,
\r
337 const char * const pcTopicFilter,
\r
338 uint16_t usTopicFilterLength,
\r
339 MQTTPublishCallback_t xCallback,
\r
340 void * pvParameter )
\r
342 MQTTCallback_t * pxCallback = NULL;
\r
343 BaseType_t xStatus = pdFAIL, i = 0;
\r
345 /* Prevent other tasks from modifying stored callbacks while this function
\r
347 if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),
\r
348 portMAX_DELAY ) == pdTRUE )
\r
350 /* Check if the topic filter already has an entry. */
\r
351 pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );
\r
353 if( pxCallback == NULL )
\r
355 /* If no entry was found, find a free entry. */
\r
356 for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )
\r
358 if( pxConnection->xCallbacks[ i ].xInUse == pdFALSE )
\r
360 pxConnection->xCallbacks[ i ].xInUse = pdTRUE;
\r
361 pxCallback = &( pxConnection->xCallbacks[ i ] );
\r
367 /* Set the members of the callback entry. */
\r
368 if( i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS )
\r
370 pxCallback->pvParameter = pvParameter;
\r
371 pxCallback->usTopicFilterLength = usTopicFilterLength;
\r
372 pxCallback->xFunction = xCallback;
\r
373 ( void ) strncpy( pxCallback->pcTopicFilter, pcTopicFilter, usTopicFilterLength );
\r
377 ( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );
\r
383 /*-----------------------------------------------------------*/
\r
385 static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,
\r
386 const char * const pcTopicFilter,
\r
387 uint16_t usTopicFilterLength )
\r
390 MQTTCallback_t * pxResult = NULL;
\r
392 /* Search the callback conversion table for the topic filter. */
\r
393 for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )
\r
395 if( ( pxConnection->xCallbacks[ i ].usTopicFilterLength == usTopicFilterLength ) &&
\r
396 ( strncmp( pxConnection->xCallbacks[ i ].pcTopicFilter,
\r
398 usTopicFilterLength ) == 0 ) )
\r
400 pxResult = &( pxConnection->xCallbacks[ i ] );
\r
408 /*-----------------------------------------------------------*/
\r
410 static void prvRemoveCallback( MQTTConnection_t * const pxConnection,
\r
411 const char * const pcTopicFilter,
\r
412 uint16_t usTopicFilterLength )
\r
414 MQTTCallback_t * pxCallback = NULL;
\r
416 /* Prevent other tasks from modifying stored callbacks while this function
\r
418 if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),
\r
419 portMAX_DELAY ) == pdTRUE )
\r
421 /* Find the given topic filter. */
\r
422 pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );
\r
424 if( pxCallback != NULL )
\r
426 /* Clear the callback entry. */
\r
427 mqttconfigASSERT( pxCallback->xInUse == pdTRUE );
\r
428 ( void ) memset( pxCallback, 0x00, sizeof( MQTTCallback_t ) );
\r
431 ( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );
\r
434 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
\r
436 /*-----------------------------------------------------------*/
\r
438 IotMqttConnection_t MQTT_AGENT_Getv2Connection( MQTTAgentHandle_t xMQTTHandle )
\r
440 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
442 return pxConnection->xMQTTConnection;
\r
445 /*-----------------------------------------------------------*/
\r
447 BaseType_t MQTT_AGENT_Init( void )
\r
449 BaseType_t xStatus = pdFALSE;
\r
451 /* Call the initialization function of MQTT v2. */
\r
452 if( IotMqtt_Init() == IOT_MQTT_SUCCESS )
\r
460 /*-----------------------------------------------------------*/
\r
462 MQTTAgentReturnCode_t MQTT_AGENT_Create( MQTTAgentHandle_t * const pxMQTTHandle )
\r
464 MQTTConnection_t * pxNewConnection = NULL;
\r
465 MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
\r
467 /* Check how many brokers are available; fail if all brokers are in use. */
\r
468 taskENTER_CRITICAL();
\r
470 if( uxAvailableBrokers == 0 )
\r
472 xStatus = eMQTTAgentFailure;
\r
476 uxAvailableBrokers--;
\r
477 mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
\r
480 taskEXIT_CRITICAL();
\r
482 /* Allocate memory for an MQTT connection. */
\r
483 if( xStatus == eMQTTAgentSuccess )
\r
485 pxNewConnection = pvPortMalloc( sizeof( MQTTConnection_t ) );
\r
487 if( pxNewConnection == NULL )
\r
489 xStatus = eMQTTAgentFailure;
\r
491 taskENTER_CRITICAL();
\r
493 uxAvailableBrokers++;
\r
494 mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
\r
496 taskEXIT_CRITICAL();
\r
500 ( void ) memset( pxNewConnection, 0x00, sizeof( MQTTConnection_t ) );
\r
501 pxNewConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
\r
505 /* Create the connection mutex and set the output parameter. */
\r
506 if( xStatus == eMQTTAgentSuccess )
\r
508 ( void ) xSemaphoreCreateMutexStatic( &( pxNewConnection->xConnectionMutex ) );
\r
509 *pxMQTTHandle = ( MQTTAgentHandle_t ) pxNewConnection;
\r
515 /*-----------------------------------------------------------*/
\r
517 MQTTAgentReturnCode_t MQTT_AGENT_Delete( MQTTAgentHandle_t xMQTTHandle )
\r
519 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
521 /* Clean up any allocated MQTT or network resources. */
\r
522 if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )
\r
524 IotMqtt_Disconnect( pxConnection->xMQTTConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );
\r
525 pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
\r
528 /* Free memory used by the MQTT connection. */
\r
529 vPortFree( pxConnection );
\r
531 /* Increment the number of available brokers. */
\r
532 taskENTER_CRITICAL();
\r
534 uxAvailableBrokers++;
\r
535 mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
\r
537 taskEXIT_CRITICAL();
\r
539 return eMQTTAgentSuccess;
\r
542 /*-----------------------------------------------------------*/
\r
544 MQTTAgentReturnCode_t MQTT_AGENT_Connect( MQTTAgentHandle_t xMQTTHandle,
\r
545 const MQTTAgentConnectParams_t * const pxConnectParams,
\r
546 TickType_t xTimeoutTicks )
\r
548 MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
\r
549 IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
\r
550 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
551 IotNetworkServerInfo_t xServerInfo = { 0 };
\r
552 IotNetworkCredentials_t xCredentials = AWS_IOT_NETWORK_CREDENTIALS_AFR_INITIALIZER, * pxCredentials = NULL;
\r
553 IotMqttNetworkInfo_t xNetworkInfo = IOT_MQTT_NETWORK_INFO_INITIALIZER;
\r
554 IotMqttConnectInfo_t xMqttConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
\r
556 /* Copy the global callback and parameter. */
\r
557 pxConnection->pxCallback = pxConnectParams->pxCallback;
\r
558 pxConnection->pvUserData = pxConnectParams->pvUserData;
\r
560 /* Set the TLS info for a secured connection. */
\r
561 if( ( pxConnectParams->xSecuredConnection == pdTRUE ) ||
\r
562 ( ( pxConnectParams->xFlags & mqttagentREQUIRE_TLS ) == mqttagentREQUIRE_TLS ) )
\r
564 pxCredentials = &xCredentials;
\r
566 /* Set the server certificate. Other credentials are set by the initializer. */
\r
567 xCredentials.pRootCa = pxConnectParams->pcCertificate;
\r
568 xCredentials.rootCaSize = ( size_t ) pxConnectParams->ulCertificateSize;
\r
570 /* Disable ALPN if requested. */
\r
571 if( ( pxConnectParams->xFlags & mqttagentUSE_AWS_IOT_ALPN_443 ) == 0 )
\r
573 xCredentials.pAlpnProtos = NULL;
\r
576 /* Disable SNI if requested. */
\r
577 if( ( pxConnectParams->xURLIsIPAddress == pdTRUE ) ||
\r
578 ( ( pxConnectParams->xFlags & mqttagentURL_IS_IP_ADDRESS ) == mqttagentURL_IS_IP_ADDRESS ) )
\r
580 xCredentials.disableSni = true;
\r
584 /* Set the server info. */
\r
585 xServerInfo.pHostName = pxConnectParams->pcURL;
\r
586 xServerInfo.port = pxConnectParams->usPort;
\r
588 /* Set the members of the network info. */
\r
589 xNetworkInfo.createNetworkConnection = true;
\r
590 xNetworkInfo.u.setup.pNetworkServerInfo = &xServerInfo;
\r
591 xNetworkInfo.u.setup.pNetworkCredentialInfo = pxCredentials;
\r
592 xNetworkInfo.pNetworkInterface = IOT_NETWORK_INTERFACE_AFR;
\r
594 if( pxConnectParams->pxCallback != NULL )
\r
596 xNetworkInfo.disconnectCallback.function = prvDisconnectCallbackWrapper;
\r
597 xNetworkInfo.disconnectCallback.pCallbackContext = pxConnection;
\r
600 /* Set the members of the MQTT connect info. */
\r
601 xMqttConnectInfo.awsIotMqttMode = true;
\r
602 xMqttConnectInfo.cleanSession = true;
\r
603 xMqttConnectInfo.pClientIdentifier = ( const char * ) ( pxConnectParams->pucClientId );
\r
604 xMqttConnectInfo.clientIdentifierLength = pxConnectParams->usClientIdLength;
\r
605 xMqttConnectInfo.keepAliveSeconds = mqttconfigKEEP_ALIVE_INTERVAL_SECONDS;
\r
607 /* Call MQTT v2's CONNECT function. */
\r
608 xMqttStatus = IotMqtt_Connect( &xNetworkInfo,
\r
610 mqttTICKS_TO_MS( xTimeoutTicks ),
\r
611 &( pxConnection->xMQTTConnection ) );
\r
612 xStatus = prvConvertReturnCode( xMqttStatus );
\r
614 /* Add a subscription to "#" to support the global callback when subscription
\r
615 * manager is disabled. */
\r
616 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 0 )
\r
617 IotMqttSubscription_t xGlobalSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
\r
618 IotMqttReference_t xGlobalSubscriptionRef = IOT_MQTT_REFERENCE_INITIALIZER;
\r
620 if( xStatus == eMQTTAgentSuccess )
\r
622 xGlobalSubscription.pTopicFilter = "#";
\r
623 xGlobalSubscription.topicFilterLength = 1;
\r
624 xGlobalSubscription.qos = 0;
\r
625 xGlobalSubscription.callback.param1 = pxConnection;
\r
626 xGlobalSubscription.callback.function = prvPublishCallbackWrapper;
\r
628 xMqttStatus = IotMqtt_Subscribe( pxConnection->xMQTTConnection,
\r
629 &xGlobalSubscription,
\r
631 IOT_MQTT_FLAG_WAITABLE,
\r
633 &xGlobalSubscriptionRef );
\r
634 xStatus = prvConvertReturnCode( xMqttStatus );
\r
637 /* Wait for the subscription to "#" to complete. */
\r
638 if( xStatus == eMQTTAgentSuccess )
\r
640 xMqttStatus = IotMqtt_Wait( xGlobalSubscriptionRef,
\r
641 mqttTICKS_TO_MS( xTimeoutTicks ) );
\r
642 xStatus = prvConvertReturnCode( xMqttStatus );
\r
644 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
\r
649 /*-----------------------------------------------------------*/
\r
651 MQTTAgentReturnCode_t MQTT_AGENT_Disconnect( MQTTAgentHandle_t xMQTTHandle,
\r
652 TickType_t xTimeoutTicks )
\r
654 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
656 /* MQTT v2's DISCONNECT function does not have a timeout argument. */
\r
657 ( void ) xTimeoutTicks;
\r
659 /* Check that the connection is established. */
\r
660 if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )
\r
662 /* Call MQTT v2's DISCONNECT function. */
\r
663 IotMqtt_Disconnect( pxConnection->xMQTTConnection,
\r
665 pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
\r
668 return eMQTTAgentSuccess;
\r
671 /*-----------------------------------------------------------*/
\r
673 MQTTAgentReturnCode_t MQTT_AGENT_Subscribe( MQTTAgentHandle_t xMQTTHandle,
\r
674 const MQTTAgentSubscribeParams_t * const pxSubscribeParams,
\r
675 TickType_t xTimeoutTicks )
\r
677 MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
\r
678 IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
\r
679 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
680 IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
\r
682 /* Store the topic filter if subscription management is enabled. */
\r
683 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
684 /* Check topic filter length. */
\r
685 if( pxSubscribeParams->usTopicLength > mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH )
\r
687 xStatus = eMQTTAgentFailure;
\r
690 /* Store the subscription. */
\r
691 if( prvStoreCallback( pxConnection,
\r
692 ( const char * ) pxSubscribeParams->pucTopic,
\r
693 pxSubscribeParams->usTopicLength,
\r
694 pxSubscribeParams->pxPublishCallback,
\r
695 pxSubscribeParams->pvPublishCallbackContext ) == pdFAIL )
\r
697 xStatus = eMQTTAgentFailure;
\r
699 #endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
\r
701 /* Call MQTT v2 blocking SUBSCRIBE function. */
\r
702 if( xStatus == eMQTTAgentSuccess )
\r
704 /* Set the members of the MQTT subscription. */
\r
705 xSubscription.pTopicFilter = ( const char * ) ( pxSubscribeParams->pucTopic );
\r
706 xSubscription.topicFilterLength = pxSubscribeParams->usTopicLength;
\r
707 xSubscription.qos = ( IotMqttQos_t ) pxSubscribeParams->xQoS;
\r
708 xSubscription.callback.pCallbackContext = pxConnection;
\r
709 xSubscription.callback.function = prvPublishCallbackWrapper;
\r
711 xMqttStatus = IotMqtt_TimedSubscribe( pxConnection->xMQTTConnection,
\r
715 mqttTICKS_TO_MS( xTimeoutTicks ) );
\r
716 xStatus = prvConvertReturnCode( xMqttStatus );
\r
722 /*-----------------------------------------------------------*/
\r
724 MQTTAgentReturnCode_t MQTT_AGENT_Unsubscribe( MQTTAgentHandle_t xMQTTHandle,
\r
725 const MQTTAgentUnsubscribeParams_t * const pxUnsubscribeParams,
\r
726 TickType_t xTimeoutTicks )
\r
728 IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
\r
729 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
730 IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
\r
732 /* Remove any subscription callback that may be registered. */
\r
733 #if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
\r
734 prvRemoveCallback( pxConnection,
\r
735 ( const char * ) ( pxUnsubscribeParams->pucTopic ),
\r
736 pxUnsubscribeParams->usTopicLength );
\r
739 /* Set the members of the subscription to remove. */
\r
740 xSubscription.pTopicFilter = ( const char * ) ( pxUnsubscribeParams->pucTopic );
\r
741 xSubscription.topicFilterLength = pxUnsubscribeParams->usTopicLength;
\r
742 xSubscription.callback.pCallbackContext = pxConnection;
\r
743 xSubscription.callback.function = prvPublishCallbackWrapper;
\r
745 /* Call MQTT v2 blocking UNSUBSCRIBE function. */
\r
746 xMqttStatus = IotMqtt_TimedUnsubscribe( pxConnection->xMQTTConnection,
\r
750 mqttTICKS_TO_MS( xTimeoutTicks ) );
\r
752 return prvConvertReturnCode( xMqttStatus );
\r
755 /*-----------------------------------------------------------*/
\r
757 MQTTAgentReturnCode_t MQTT_AGENT_Publish( MQTTAgentHandle_t xMQTTHandle,
\r
758 const MQTTAgentPublishParams_t * const pxPublishParams,
\r
759 TickType_t xTimeoutTicks )
\r
761 IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
\r
762 MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
\r
763 IotMqttPublishInfo_t xPublishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;
\r
765 /* Set the members of the publish info. */
\r
766 xPublishInfo.pTopicName = ( const char * ) pxPublishParams->pucTopic;
\r
767 xPublishInfo.topicNameLength = pxPublishParams->usTopicLength;
\r
768 xPublishInfo.qos = ( IotMqttQos_t ) pxPublishParams->xQoS;
\r
769 xPublishInfo.pPayload = ( const void * ) pxPublishParams->pvData;
\r
770 xPublishInfo.payloadLength = pxPublishParams->ulDataLength;
\r
772 /* Call the MQTT v2 blocking PUBLISH function. */
\r
773 xMqttStatus = IotMqtt_TimedPublish( pxConnection->xMQTTConnection,
\r
776 mqttTICKS_TO_MS( xTimeoutTicks ) );
\r
778 return prvConvertReturnCode( xMqttStatus );
\r
781 /*-----------------------------------------------------------*/
\r
783 MQTTAgentReturnCode_t MQTT_AGENT_ReturnBuffer( MQTTAgentHandle_t xMQTTHandle,
\r
784 MQTTBufferHandle_t xBufferHandle )
\r
786 ( void ) xMQTTHandle;
\r
788 /* Free the MQTT buffer. */
\r
789 vPortFree( xBufferHandle );
\r
791 return eMQTTAgentSuccess;
\r
794 /*-----------------------------------------------------------*/
\r