]> git.sur5r.net Git - freertos/blobdiff - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
Correct an err in queue.c introduced when previously updating behaviour when queue...
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_api.c
index 7df5326dcc6ab60acee35f704ba17771c934782a..0d6a259ed651c5dcfd3facf0b632990a0109a462 100644 (file)
@@ -94,7 +94,7 @@ static void _mqttSubscription_tryDestroy( void * pData );
 static void _mqttOperation_tryDestroy( void * pData );\r
 \r
 /**\r
- * @brief Create a keep-alive job for an MQTT connection.\r
+ * @brief Initialize the keep-alive operation for an MQTT connection.\r
  *\r
  * @param[in] pNetworkInfo User-provided network information for the new\r
  * connection.\r
@@ -103,9 +103,9 @@ static void _mqttOperation_tryDestroy( void * pData );
  *\r
  * @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
  */\r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
-                                 uint16_t keepAliveSeconds,\r
-                                 _mqttConnection_t * pMqttConnection );\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                       uint16_t keepAliveSeconds,\r
+                                       _mqttConnection_t * pMqttConnection );\r
 \r
 /**\r
  * @brief Creates a new MQTT connection and initializes its members.\r
@@ -141,7 +141,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
                                            size_t subscriptionCount,\r
                                            uint32_t flags,\r
                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                           IotMqttOperation_t * pOperationReference );\r
+                                           IotMqttOperation_t * const pOperationReference );\r
 \r
 /*-----------------------------------------------------------*/\r
 \r
@@ -238,9 +238,9 @@ static void _mqttOperation_tryDestroy( void * pData )
 \r
 /*-----------------------------------------------------------*/\r
 \r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
-                                 uint16_t keepAliveSeconds,\r
-                                 _mqttConnection_t * pMqttConnection )\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+                                       uint16_t keepAliveSeconds,\r
+                                       _mqttConnection_t * pMqttConnection )\r
 {\r
     bool status = true;\r
     IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
@@ -253,9 +253,12 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
     IotMqttError_t ( * serializePingreq )( uint8_t **,\r
                                            size_t * ) = _IotMqtt_SerializePingreq;\r
 \r
+    /* Set PINGREQ operation members. */\r
+    pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;\r
+\r
     /* Convert the keep-alive interval to milliseconds. */\r
-    pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
-    pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+    pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;\r
+    pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;\r
 \r
     /* Choose a PINGREQ serializer function. */\r
     #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
@@ -277,8 +280,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
 \r
     /* Generate a PINGREQ packet. */\r
-    serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
-                                        &( pMqttConnection->pingreqPacketSize ) );\r
+    serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),\r
+                                        &( pMqttConnection->pingreq.u.operation.packetSize ) );\r
 \r
     if( serializeStatus != IOT_MQTT_SUCCESS )\r
     {\r
@@ -291,8 +294,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
         /* Create the task pool job that processes keep-alive. */\r
         jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
                                            pMqttConnection,\r
-                                           &( pMqttConnection->keepAliveJobStorage ),\r
-                                           &( pMqttConnection->keepAliveJob ) );\r
+                                           &( pMqttConnection->pingreq.jobStorage ),\r
+                                           &( pMqttConnection->pingreq.job ) );\r
 \r
         /* Task pool job creation for a pre-allocated job should never fail.\r
          * Abort the program if it does. */\r
@@ -408,9 +411,9 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
     /* Check if keep-alive is active for this connection. */\r
     if( keepAliveSeconds != 0 )\r
     {\r
-        if( _createKeepAliveJob( pNetworkInfo,\r
-                                 keepAliveSeconds,\r
-                                 pMqttConnection ) == false )\r
+        if( _createKeepAliveOperation( pNetworkInfo,\r
+                                       keepAliveSeconds,\r
+                                       pMqttConnection ) == false )\r
         {\r
             IOT_SET_AND_GOTO_CLEANUP( false );\r
         }\r
@@ -471,17 +474,31 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
 {\r
     IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
 \r
+    /* Default free packet function. */\r
+    void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;\r
+\r
     /* Clean up keep-alive if still allocated. */\r
-    if( pMqttConnection->keepAliveMs != 0 )\r
+    if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
     {\r
         IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
 \r
-        _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+        /* Choose a function to free the packet. */\r
+        #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+            if( pMqttConnection->pSerializer != NULL )\r
+            {\r
+                if( pMqttConnection->pSerializer->freePacket != NULL )\r
+                {\r
+                    freePacket = pMqttConnection->pSerializer->freePacket;\r
+                }\r
+            }\r
+        #endif\r
+\r
+        freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
 \r
         /* Clear data about the keep-alive. */\r
-        pMqttConnection->keepAliveMs = 0;\r
-        pMqttConnection->pPingreqPacket = NULL;\r
-        pMqttConnection->pingreqPacketSize = 0;\r
+        pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+        pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+        pMqttConnection->pingreq.u.operation.packetSize = 0;\r
 \r
         /* Decrement reference count. */\r
         pMqttConnection->references--;\r
@@ -494,9 +511,9 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
     /* A connection to be destroyed should have no keep-alive and at most 1\r
      * reference. */\r
     IotMqtt_Assert( pMqttConnection->references <= 1 );\r
-    IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
-    IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
-    IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
+    IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );\r
+    IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );\r
+    IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );\r
 \r
     /* Remove all subscriptions. */\r
     IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
@@ -546,7 +563,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
                                            size_t subscriptionCount,\r
                                            uint32_t flags,\r
                                            const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                           IotMqttOperation_t * pOperationReference )\r
+                                           IotMqttOperation_t * const pOperationReference )\r
 {\r
     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
     _mqttOperation_t * pSubscriptionOperation = NULL;\r
@@ -666,7 +683,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
 \r
     /* Check the subscription operation data and set the operation type. */\r
     IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
-    IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
+    IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );\r
     pSubscriptionOperation->u.operation.type = operation;\r
 \r
     /* Generate a subscription packet from the subscription list. */\r
@@ -1059,7 +1076,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
     IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
                     == IOT_MQTT_FLAG_WAITABLE );\r
-    IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+    IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
 \r
     /* Set the operation type. */\r
     pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
@@ -1150,13 +1167,13 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
     if( status == IOT_MQTT_SUCCESS )\r
     {\r
         /* Check if a keep-alive job should be scheduled. */\r
-        if( pNewMqttConnection->keepAliveMs != 0 )\r
+        if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
         {\r
             IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
 \r
             taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
-                                                           pNewMqttConnection->keepAliveJob,\r
-                                                           pNewMqttConnection->nextKeepAliveMs );\r
+                                                           pNewMqttConnection->pingreq.job,\r
+                                                           pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );\r
 \r
             if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
             {\r
@@ -1268,7 +1285,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
                 IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
                 IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
                                 == IOT_MQTT_FLAG_WAITABLE );\r
-                IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+                IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
 \r
                 /* Set the operation type. */\r
                 pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
@@ -1389,7 +1406,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
                                   size_t subscriptionCount,\r
                                   uint32_t flags,\r
                                   const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                  IotMqttOperation_t * pSubscribeOperation )\r
+                                  IotMqttOperation_t * const pSubscribeOperation )\r
 {\r
     return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
                                 mqttConnection,\r
@@ -1445,7 +1462,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
                                     size_t subscriptionCount,\r
                                     uint32_t flags,\r
                                     const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                    IotMqttOperation_t * pUnsubscribeOperation )\r
+                                    IotMqttOperation_t * const pUnsubscribeOperation )\r
 {\r
     return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
                                 mqttConnection,\r
@@ -1500,7 +1517,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
                                 const IotMqttPublishInfo_t * pPublishInfo,\r
                                 uint32_t flags,\r
                                 const IotMqttCallbackInfo_t * pCallbackInfo,\r
-                                IotMqttOperation_t * pPublishOperation )\r
+                                IotMqttOperation_t * const pPublishOperation )\r
 {\r
     IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
     _mqttOperation_t * pOperation = NULL;\r
@@ -1651,8 +1668,8 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
         /* A QoS 0 PUBLISH may not be retried. */\r
         if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
         {\r
-            pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
-            pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
+            pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;\r
+            pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;\r
         }\r
         else\r
         {\r